From d32b46ca285f971be1cf2dd86cc36cdbdcf1a2ef Mon Sep 17 00:00:00 2001 From: test01 Date: Sun, 4 Jan 2026 01:43:36 +0800 Subject: [PATCH] chore: ignore test files --- .gitignore | 4 + edgetunnel_proxy.py | 289 ---------------------------------------- test.py | 9 -- test_api.py | 16 --- test_download.py | 260 ------------------------------------ test_httpx.py | 239 --------------------------------- test_impersonate.py | 190 -------------------------- test_impersonate_new.py | 137 ------------------- 8 files changed, 4 insertions(+), 1140 deletions(-) delete mode 100644 edgetunnel_proxy.py delete mode 100644 test.py delete mode 100644 test_api.py delete mode 100644 test_download.py delete mode 100644 test_httpx.py delete mode 100644 test_impersonate.py delete mode 100644 test_impersonate_new.py diff --git a/.gitignore b/.gitignore index 54d424a..7f859e6 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,7 @@ dist/ build/ *.egg-info/ __pycache__/ + +# 测试文件 +test*.py +edgetunnel_proxy.py diff --git a/edgetunnel_proxy.py b/edgetunnel_proxy.py deleted file mode 100644 index 988b4c3..0000000 --- a/edgetunnel_proxy.py +++ /dev/null @@ -1,289 +0,0 @@ -""" -edgetunnel to HTTP Proxy -将 edgetunnel Workers 转换为本地 HTTP 代理 - -使用方法: - python edgetunnel_proxy.py --host v2.kami666.xyz --uuid your-uuid --port 8080 - -然后可以使用: - curl -x http://127.0.0.1:8080 https://httpbin.org/ip -""" - -import argparse -import socket -import struct -import threading -import ssl -import time -import uuid as uuid_module -from urllib.parse import urlparse - - -class VlessProxy: - """VLESS to HTTP Proxy""" - - def __init__(self, edgetunnel_host, vless_uuid, local_port=8080): - self.edgetunnel_host = edgetunnel_host - self.vless_uuid = vless_uuid - self.local_port = local_port - self.running = False - self.server_socket = None - - def _create_vless_header(self, target_host, target_port): - """创建 VLESS 请求头""" - header = bytes([0]) # 版本 - header += uuid_module.UUID(self.vless_uuid).bytes # UUID - header += bytes([0]) # 附加信息长度 - header += bytes([1]) # TCP 命令 - header += struct.pack('>H', target_port) # 端口 - - # 地址 - try: - socket.inet_aton(target_host) - header += bytes([1]) - header += socket.inet_aton(target_host) - except socket.error: - header += bytes([2]) - domain_bytes = target_host.encode('utf-8') - header += bytes([len(domain_bytes)]) - header += domain_bytes - - return header - - def _websocket_handshake(self, sock): - """WebSocket 握手""" - import base64 - import hashlib - import os - - key = base64.b64encode(os.urandom(16)).decode() - path = f'/{self.vless_uuid}' - - request = ( - f"GET {path} HTTP/1.1\r\n" - f"Host: {self.edgetunnel_host}\r\n" - f"Upgrade: websocket\r\n" - f"Connection: Upgrade\r\n" - f"Sec-WebSocket-Key: {key}\r\n" - f"Sec-WebSocket-Version: 13\r\n" - f"\r\n" - ) - - sock.sendall(request.encode()) - response = b"" - while b"\r\n\r\n" not in response: - chunk = sock.recv(1024) - if not chunk: - raise Exception("WebSocket handshake failed") - response += chunk - - if b"101" not in response: - raise Exception(f"WebSocket upgrade failed: {response[:200]}") - - return True - - def _send_ws_frame(self, sock, data): - """发送 WebSocket 帧""" - import os - mask_key = os.urandom(4) - length = len(data) - - if length <= 125: - header = bytes([0x82, 0x80 | length]) - elif length <= 65535: - header = bytes([0x82, 0x80 | 126]) + struct.pack('>H', length) - else: - header = bytes([0x82, 0x80 | 127]) + struct.pack('>Q', length) - - masked_data = bytes([data[i] ^ mask_key[i % 4] for i in range(length)]) - sock.sendall(header + mask_key + masked_data) - - def _recv_ws_frame(self, sock): - """接收 WebSocket 帧""" - header = sock.recv(2) - if len(header) < 2: - return None - - payload_len = header[1] & 0x7F - if payload_len == 126: - ext = sock.recv(2) - payload_len = struct.unpack('>H', ext)[0] - elif payload_len == 127: - ext = sock.recv(8) - payload_len = struct.unpack('>Q', ext)[0] - - data = b"" - while len(data) < payload_len: - chunk = sock.recv(min(payload_len - len(data), 8192)) - if not chunk: - break - data += chunk - - return data - - def _handle_client(self, client_socket, client_addr): - """处理客户端连接""" - try: - request = b"" - while b"\r\n\r\n" not in request: - chunk = client_socket.recv(4096) - if not chunk: - return - request += chunk - - first_line = request.split(b"\r\n")[0].decode() - parts = first_line.split() - - if len(parts) < 3: - return - - method = parts[0] - - if method == "CONNECT": - # HTTPS 代理 - host_port = parts[1] - if ":" in host_port: - target_host, target_port = host_port.rsplit(":", 1) - target_port = int(target_port) - else: - target_host = host_port - target_port = 443 - - initial_data = b"" - else: - # HTTP 代理 - url = parts[1] - parsed = urlparse(url) - target_host = parsed.hostname - target_port = parsed.port or 80 - - # 重写请求 - path = parsed.path or "/" - if parsed.query: - path += "?" + parsed.query - - new_first_line = f"{method} {path} HTTP/1.1\r\n" - rest = request.split(b"\r\n", 1)[1] - initial_data = new_first_line.encode() + rest - - # 连接 edgetunnel - ws_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - ws_sock.settimeout(30) - - context = ssl.create_default_context() - ws_sock = context.wrap_socket(ws_sock, server_hostname=self.edgetunnel_host) - ws_sock.connect((self.edgetunnel_host, 443)) - - self._websocket_handshake(ws_sock) - - # 发送 VLESS 头 + 初始数据 - vless_header = self._create_vless_header(target_host, target_port) - self._send_ws_frame(ws_sock, vless_header + initial_data) - - if method == "CONNECT": - client_socket.sendall(b"HTTP/1.1 200 Connection Established\r\n\r\n") - - # 双向转发 - ws_sock.setblocking(False) - client_socket.setblocking(False) - - first_response = True - while True: - readable = [] - try: - client_socket.setblocking(False) - ws_sock.setblocking(False) - - import select - readable, _, _ = select.select([client_socket, ws_sock], [], [], 30) - except: - break - - if not readable: - break - - for sock in readable: - try: - if sock is client_socket: - data = client_socket.recv(8192) - if not data: - raise Exception("Client closed") - self._send_ws_frame(ws_sock, data) - else: - ws_sock.setblocking(True) - ws_sock.settimeout(1) - data = self._recv_ws_frame(ws_sock) - if data is None: - raise Exception("WS closed") - - # 跳过 VLESS 响应头 - if first_response and len(data) >= 2: - addon_len = data[1] - data = data[2 + addon_len:] - first_response = False - - if data: - client_socket.sendall(data) - except Exception as e: - raise - - except Exception as e: - pass - finally: - try: - client_socket.close() - except: - pass - - def start(self): - """启动代理服务器""" - self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.server_socket.bind(('127.0.0.1', self.local_port)) - self.server_socket.listen(100) - self.running = True - - print(f"[*] edgetunnel HTTP Proxy started") - print(f"[*] edgetunnel: {self.edgetunnel_host}") - print(f"[*] UUID: {self.vless_uuid}") - print(f"[*] Local proxy: http://127.0.0.1:{self.local_port}") - print(f"[*] Usage: curl -x http://127.0.0.1:{self.local_port} https://httpbin.org/ip") - print() - - while self.running: - try: - client_socket, client_addr = self.server_socket.accept() - thread = threading.Thread(target=self._handle_client, args=(client_socket, client_addr)) - thread.daemon = True - thread.start() - except Exception as e: - if self.running: - print(f"[!] Accept error: {e}") - - def stop(self): - """停止代理服务器""" - self.running = False - if self.server_socket: - self.server_socket.close() - - -def main(): - parser = argparse.ArgumentParser(description='edgetunnel to HTTP Proxy') - parser.add_argument('--host', '-H', required=True, help='edgetunnel Workers host (e.g., v2.kami666.xyz)') - parser.add_argument('--uuid', '-u', required=True, help='VLESS UUID') - parser.add_argument('--port', '-p', type=int, default=8080, help='Local proxy port (default: 8080)') - - args = parser.parse_args() - - proxy = VlessProxy(args.host, args.uuid, args.port) - - try: - proxy.start() - except KeyboardInterrupt: - print("\n[*] Stopping...") - proxy.stop() - - -if __name__ == '__main__': - main() - diff --git a/test.py b/test.py deleted file mode 100644 index 001207f..0000000 --- a/test.py +++ /dev/null @@ -1,9 +0,0 @@ -import cfspider -import requests - -worker_url = "ip.kami666.xyz" -cf_response = cfspider.get("https://httpbin.org/ip", cf_proxies=worker_url) -req_response = requests.get("https://httpbin.org/ip") - -print(cf_response.text) -print(req_response.text) \ No newline at end of file diff --git a/test_api.py b/test_api.py deleted file mode 100644 index d63beb0..0000000 --- a/test_api.py +++ /dev/null @@ -1,16 +0,0 @@ -import cfspider - -print("1. 无代理:") -r = cfspider.get("https://httpbin.org/ip") -print(f" IP: {r.json()['origin']}") - -print("2. Workers代理 (cf_workers=True):") -r = cfspider.get("https://httpbin.org/ip", cf_proxies="cfspider.violetqqcom.workers.dev") -print(f" IP: {r.json()['origin']}") - -print("3. 普通代理 (cf_workers=False):") -r = cfspider.get("https://httpbin.org/ip", cf_proxies="127.0.0.1:9674", cf_workers=False) -print(f" IP: {r.json()['origin']}") - -print("\nDone!") - diff --git a/test_download.py b/test_download.py deleted file mode 100644 index ef3a89f..0000000 --- a/test_download.py +++ /dev/null @@ -1,260 +0,0 @@ -""" -测试 cfspider 文件下载功能(流式响应) -""" -import asyncio -import os -import sys -sys.path.insert(0, '.') - -import cfspider - -# Workers 地址 -CF_WORKERS = "https://ip.kami666.xyz" - -# 测试文件 URL -TEST_FILES = [ - { - "name": "小文件 (JSON)", - "url": "https://httpbin.org/json", - "filename": "test_json.json" - }, - { - "name": "中等文件 (robots.txt)", - "url": "https://www.google.com/robots.txt", - "filename": "test_robots.txt" - }, - { - "name": "图片文件 (PNG)", - "url": "https://httpbin.org/image/png", - "filename": "test_image.png" - } -] - -async def test_stream_download_no_proxy(): - """测试流式下载 - 无代理""" - print("\n" + "="*60) - print("测试 1: 流式下载 - 无代理模式") - print("="*60) - - url = "https://httpbin.org/bytes/10240" # 10KB 随机字节 - filename = "test_bytes_no_proxy.bin" - - try: - total_bytes = 0 - async with cfspider.astream("GET", url) as response: - print(f"状态码: {response.status_code}") - print(f"HTTP 版本: {response.http_version}") - - with open(filename, "wb") as f: - async for chunk in response.aiter_bytes(chunk_size=1024): - f.write(chunk) - total_bytes += len(chunk) - - file_size = os.path.getsize(filename) - print(f"下载完成: {filename}") - print(f"文件大小: {file_size} bytes") - os.remove(filename) - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - -async def test_stream_download_workers(): - """测试流式下载 - Workers API 代理""" - print("\n" + "="*60) - print("测试 2: 流式下载 - Workers API 代理") - print("="*60) - - url = "https://httpbin.org/bytes/10240" # 10KB 随机字节 - filename = "test_bytes_workers.bin" - - try: - total_bytes = 0 - async with cfspider.astream("GET", url, cf_proxies=CF_WORKERS) as response: - print(f"状态码: {response.status_code}") - print(f"HTTP 版本: {response.http_version}") - print(f"CF Colo: {response.cf_colo}") - - with open(filename, "wb") as f: - async for chunk in response.aiter_bytes(chunk_size=1024): - f.write(chunk) - total_bytes += len(chunk) - - file_size = os.path.getsize(filename) - print(f"下载完成: {filename}") - print(f"文件大小: {file_size} bytes") - os.remove(filename) - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - -async def test_download_image(): - """测试下载图片文件""" - print("\n" + "="*60) - print("测试 3: 下载图片文件 - Workers API 代理") - print("="*60) - - url = "https://httpbin.org/image/png" - filename = "test_image.png" - - try: - async with cfspider.astream("GET", url, cf_proxies=CF_WORKERS) as response: - print(f"状态码: {response.status_code}") - print(f"HTTP 版本: {response.http_version}") - print(f"Content-Type: {response.headers.get('content-type', 'N/A')}") - - with open(filename, "wb") as f: - async for chunk in response.aiter_bytes(): - f.write(chunk) - - file_size = os.path.getsize(filename) - print(f"下载完成: {filename}") - print(f"文件大小: {file_size} bytes") - - # 验证 PNG 文件头 - with open(filename, "rb") as f: - header = f.read(8) - if header[:4] == b'\x89PNG': - print("文件类型验证: ✓ 有效的 PNG 文件") - else: - print("文件类型验证: ✗ 无效的 PNG 文件") - - os.remove(filename) - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - -async def test_download_text(): - """测试下载文本文件""" - print("\n" + "="*60) - print("测试 4: 下载文本文件 - Workers API 代理") - print("="*60) - - url = "https://www.google.com/robots.txt" - filename = "test_robots.txt" - - try: - lines = [] - async with cfspider.astream("GET", url, cf_proxies=CF_WORKERS) as response: - print(f"状态码: {response.status_code}") - print(f"HTTP 版本: {response.http_version}") - - async for line in response.aiter_lines(): - lines.append(line) - if len(lines) <= 5: - print(f" {line}") - - print(f"...") - print(f"总行数: {len(lines)}") - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - -async def test_large_download(): - """测试大文件下载""" - print("\n" + "="*60) - print("测试 5: 大文件下载 (100KB) - 无代理") - print("="*60) - - url = "https://httpbin.org/bytes/102400" # 100KB - filename = "test_large.bin" - - try: - import time - start = time.time() - - async with cfspider.astream("GET", url) as response: - print(f"状态码: {response.status_code}") - - with open(filename, "wb") as f: - async for chunk in response.aiter_bytes(chunk_size=8192): - f.write(chunk) - - elapsed = time.time() - start - file_size = os.path.getsize(filename) - speed = file_size / elapsed / 1024 # KB/s - - print(f"下载完成: {filename}") - print(f"文件大小: {file_size / 1024:.1f} KB") - print(f"耗时: {elapsed:.2f}s") - print(f"速度: {speed:.1f} KB/s") - - os.remove(filename) - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - -async def test_session_stream(): - """测试 Session 流式下载""" - print("\n" + "="*60) - print("测试 6: Session 流式下载 - Workers API 代理") - print("="*60) - - try: - async with cfspider.AsyncSession(cf_proxies=CF_WORKERS) as session: - async with session.stream("GET", "https://httpbin.org/bytes/5120") as response: - print(f"状态码: {response.status_code}") - print(f"HTTP 版本: {response.http_version}") - - data = await response.aread() - print(f"数据大小: {len(data)} bytes") - - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - -async def main(): - print("="*60) - print("CFspider 文件下载测试") - print("="*60) - print(f"Workers 地址: {CF_WORKERS}") - - results = [] - - results.append(await test_stream_download_no_proxy()) - results.append(await test_stream_download_workers()) - results.append(await test_download_image()) - results.append(await test_download_text()) - results.append(await test_large_download()) - results.append(await test_session_stream()) - - # 结果汇总 - print("\n" + "="*60) - print("测试结果汇总") - print("="*60) - - tests = [ - "流式下载 - 无代理", - "流式下载 - Workers API", - "下载图片文件", - "下载文本文件", - "大文件下载 (100KB)", - "Session 流式下载" - ] - - passed = 0 - failed = 0 - for i, (test, result) in enumerate(zip(tests, results)): - status = "✓ 通过" if result else "✗ 失败" - print(f"{i+1}. {test}: {status}") - if result: - passed += 1 - else: - failed += 1 - - print(f"\n总计: {passed} 通过, {failed} 失败") - -if __name__ == "__main__": - asyncio.run(main()) - diff --git a/test_httpx.py b/test_httpx.py deleted file mode 100644 index 31dd2a7..0000000 --- a/test_httpx.py +++ /dev/null @@ -1,239 +0,0 @@ -""" -测试 cfspider httpx 异步功能 -""" -import asyncio -import sys -sys.path.insert(0, '.') - -import cfspider - -# Workers 地址 -CF_WORKERS = "https://ip.kami666.xyz" - -async def test_async_no_proxy(): - """测试异步请求 - 无代理""" - print("\n" + "="*60) - print("测试 1: 异步请求 - 无代理模式") - print("="*60) - - try: - response = await cfspider.aget("https://httpbin.org/ip") - print(f"状态码: {response.status_code}") - print(f"HTTP 版本: {response.http_version}") - print(f"响应: {response.text}") - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - -async def test_async_workers_proxy(): - """测试异步请求 - Workers API 代理""" - print("\n" + "="*60) - print("测试 2: 异步请求 - Workers API 代理") - print("="*60) - - try: - response = await cfspider.aget( - "https://httpbin.org/ip", - cf_proxies=CF_WORKERS - ) - print(f"状态码: {response.status_code}") - print(f"HTTP 版本: {response.http_version}") - print(f"CF Colo: {response.cf_colo}") - print(f"CF Ray: {response.cf_ray}") - print(f"响应: {response.text}") - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - -async def test_async_post(): - """测试异步 POST 请求""" - print("\n" + "="*60) - print("测试 3: 异步 POST 请求 - Workers API 代理") - print("="*60) - - try: - response = await cfspider.apost( - "https://httpbin.org/post", - cf_proxies=CF_WORKERS, - json={"name": "cfspider", "version": "1.3.0", "feature": "httpx"} - ) - print(f"状态码: {response.status_code}") - print(f"HTTP 版本: {response.http_version}") - data = response.json() - print(f"发送的 JSON: {data.get('json', {})}") - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - -async def test_async_session(): - """测试异步 Session""" - print("\n" + "="*60) - print("测试 4: 异步 Session - Workers API 代理") - print("="*60) - - try: - async with cfspider.AsyncSession(cf_proxies=CF_WORKERS) as session: - # 第一个请求 - r1 = await session.get("https://httpbin.org/ip") - print(f"请求 1 - 状态码: {r1.status_code}, HTTP 版本: {r1.http_version}") - - # 第二个请求 - r2 = await session.post("https://httpbin.org/post", json={"test": 1}) - print(f"请求 2 - 状态码: {r2.status_code}, HTTP 版本: {r2.http_version}") - - # 第三个请求 - r3 = await session.get("https://httpbin.org/headers") - print(f"请求 3 - 状态码: {r3.status_code}, HTTP 版本: {r3.http_version}") - - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - -async def test_async_session_no_proxy(): - """测试异步 Session - 无代理""" - print("\n" + "="*60) - print("测试 5: 异步 Session - 无代理模式") - print("="*60) - - try: - async with cfspider.AsyncSession() as session: - r1 = await session.get("https://httpbin.org/ip") - print(f"请求 1 - 状态码: {r1.status_code}, HTTP 版本: {r1.http_version}") - print(f"响应: {r1.text}") - - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - -async def test_concurrent_requests(): - """测试并发请求""" - print("\n" + "="*60) - print("测试 6: 并发请求 - Workers API 代理") - print("="*60) - - try: - urls = [ - "https://httpbin.org/ip", - "https://httpbin.org/headers", - "https://httpbin.org/user-agent" - ] - - async def fetch(url): - return await cfspider.aget(url, cf_proxies=CF_WORKERS) - - import time - start = time.time() - results = await asyncio.gather(*[fetch(url) for url in urls]) - elapsed = time.time() - start - - for i, r in enumerate(results): - print(f"请求 {i+1} - 状态码: {r.status_code}, HTTP 版本: {r.http_version}") - - print(f"并发 3 个请求耗时: {elapsed:.2f}s") - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - -def test_sync_http2(): - """测试同步请求 HTTP/2""" - print("\n" + "="*60) - print("测试 7: 同步请求 HTTP/2 - 无代理") - print("="*60) - - try: - response = cfspider.get( - "https://httpbin.org/ip", - http2=True - ) - print(f"状态码: {response.status_code}") - print(f"响应: {response.text}") - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - -def test_sync_http2_workers(): - """测试同步请求 HTTP/2 - Workers 代理""" - print("\n" + "="*60) - print("测试 8: 同步请求 HTTP/2 - Workers API 代理") - print("="*60) - - try: - response = cfspider.get( - "https://httpbin.org/ip", - cf_proxies=CF_WORKERS, - http2=True - ) - print(f"状态码: {response.status_code}") - print(f"CF Colo: {response.cf_colo}") - print(f"响应: {response.text}") - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - -async def main(): - print("="*60) - print("CFspider httpx 功能测试") - print("="*60) - print(f"Workers 地址: {CF_WORKERS}") - - results = [] - - # 异步测试 - results.append(await test_async_no_proxy()) - results.append(await test_async_workers_proxy()) - results.append(await test_async_post()) - results.append(await test_async_session()) - results.append(await test_async_session_no_proxy()) - results.append(await test_concurrent_requests()) - - # 同步 HTTP/2 测试 - results.append(test_sync_http2()) - results.append(test_sync_http2_workers()) - - # 结果汇总 - print("\n" + "="*60) - print("测试结果汇总") - print("="*60) - - tests = [ - "异步请求 - 无代理", - "异步请求 - Workers API", - "异步 POST - Workers API", - "异步 Session - Workers API", - "异步 Session - 无代理", - "并发请求 - Workers API", - "同步 HTTP/2 - 无代理", - "同步 HTTP/2 - Workers API" - ] - - passed = 0 - failed = 0 - for i, (test, result) in enumerate(zip(tests, results)): - status = "✓ 通过" if result else "✗ 失败" - print(f"{i+1}. {test}: {status}") - if result: - passed += 1 - else: - failed += 1 - - print(f"\n总计: {passed} 通过, {failed} 失败") - -if __name__ == "__main__": - asyncio.run(main()) - diff --git a/test_impersonate.py b/test_impersonate.py deleted file mode 100644 index 7c0cac2..0000000 --- a/test_impersonate.py +++ /dev/null @@ -1,190 +0,0 @@ -""" -测试 cfspider TLS 指纹模拟功能 -""" -import sys -sys.path.insert(0, '.') - -import cfspider - -# Workers 地址 -CF_WORKERS = "https://ip.kami666.xyz" - - -def test_impersonate_get(): - """测试 TLS 指纹模拟 GET 请求""" - print("\n" + "="*60) - print("测试 1: TLS 指纹模拟 GET 请求 - Chrome 131") - print("="*60) - - try: - response = cfspider.impersonate_get( - "https://tls.browserleaks.com/json", - impersonate="chrome131" - ) - print(f"状态码: {response.status_code}") - data = response.json() - print(f"JA3 Hash: {data.get('ja3_hash', 'N/A')}") - print(f"JA4: {data.get('ja4', 'N/A')}") - print(f"Akamai Hash: {data.get('akamai_hash', 'N/A')}") - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - - -def test_impersonate_safari(): - """测试 Safari 指纹""" - print("\n" + "="*60) - print("测试 2: TLS 指纹模拟 GET 请求 - Safari 18") - print("="*60) - - try: - response = cfspider.impersonate_get( - "https://tls.browserleaks.com/json", - impersonate="safari18_0" - ) - print(f"状态码: {response.status_code}") - data = response.json() - print(f"JA3 Hash: {data.get('ja3_hash', 'N/A')}") - print(f"JA4: {data.get('ja4', 'N/A')}") - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - - -def test_impersonate_firefox(): - """测试 Firefox 指纹""" - print("\n" + "="*60) - print("测试 3: TLS 指纹模拟 GET 请求 - Firefox 133") - print("="*60) - - try: - response = cfspider.impersonate_get( - "https://tls.browserleaks.com/json", - impersonate="firefox133" - ) - print(f"状态码: {response.status_code}") - data = response.json() - print(f"JA3 Hash: {data.get('ja3_hash', 'N/A')}") - print(f"JA4: {data.get('ja4', 'N/A')}") - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - - -def test_impersonate_workers(): - """测试 TLS 指纹 + Workers 代理""" - print("\n" + "="*60) - print("测试 4: TLS 指纹 + Workers 代理") - print("="*60) - - try: - response = cfspider.impersonate_get( - "https://httpbin.org/ip", - impersonate="chrome131", - cf_proxies=CF_WORKERS - ) - print(f"状态码: {response.status_code}") - print(f"CF Colo: {response.cf_colo}") - print(f"响应: {response.text}") - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - - -def test_impersonate_session(): - """测试 TLS 指纹会话""" - print("\n" + "="*60) - print("测试 5: TLS 指纹会话") - print("="*60) - - try: - with cfspider.ImpersonateSession(impersonate="chrome131") as session: - r1 = session.get("https://httpbin.org/ip") - print(f"请求 1 - 状态码: {r1.status_code}") - - r2 = session.post("https://httpbin.org/post", json={"test": 1}) - print(f"请求 2 - 状态码: {r2.status_code}") - - r3 = session.get("https://httpbin.org/headers") - print(f"请求 3 - 状态码: {r3.status_code}") - - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - - -def test_supported_browsers(): - """测试获取支持的浏览器列表""" - print("\n" + "="*60) - print("测试 6: 支持的浏览器列表") - print("="*60) - - try: - browsers = cfspider.get_supported_browsers() - print(f"支持的浏览器数量: {len(browsers)}") - print(f"Chrome: {[b for b in browsers if 'chrome' in b]}") - print(f"Safari: {[b for b in browsers if 'safari' in b]}") - print(f"Firefox: {[b for b in browsers if 'firefox' in b]}") - print(f"Edge: {[b for b in browsers if 'edge' in b]}") - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - - -def main(): - print("="*60) - print("CFspider TLS 指纹模拟功能测试") - print("="*60) - print(f"Workers 地址: {CF_WORKERS}") - - results = [] - - results.append(test_impersonate_get()) - results.append(test_impersonate_safari()) - results.append(test_impersonate_firefox()) - results.append(test_impersonate_workers()) - results.append(test_impersonate_session()) - results.append(test_supported_browsers()) - - # 结果汇总 - print("\n" + "="*60) - print("测试结果汇总") - print("="*60) - - tests = [ - "Chrome 131 指纹", - "Safari 18 指纹", - "Firefox 133 指纹", - "指纹 + Workers 代理", - "指纹会话", - "支持的浏览器列表" - ] - - passed = 0 - failed = 0 - for i, (test, result) in enumerate(zip(tests, results)): - status = "✓ 通过" if result else "✗ 失败" - print(f"{i+1}. {test}: {status}") - if result: - passed += 1 - else: - failed += 1 - - print(f"\n总计: {passed} 通过, {failed} 失败") - - -if __name__ == "__main__": - main() - diff --git a/test_impersonate_new.py b/test_impersonate_new.py deleted file mode 100644 index d8bf3bd..0000000 --- a/test_impersonate_new.py +++ /dev/null @@ -1,137 +0,0 @@ -""" -测试 cfspider.get() 直接使用 impersonate 参数 -""" -import sys -sys.path.insert(0, '.') - -import cfspider - -CF_WORKERS = "https://ip.kami666.xyz" - - -def test_get_impersonate(): - """测试 get() 直接使用 impersonate""" - print("\n" + "="*60) - print("测试 1: cfspider.get() + impersonate='chrome131'") - print("="*60) - - try: - response = cfspider.get( - "https://tls.browserleaks.com/json", - impersonate="chrome131" - ) - print(f"状态码: {response.status_code}") - data = response.json() - print(f"JA3 Hash: {data.get('ja3_hash', 'N/A')}") - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - - -def test_get_impersonate_workers(): - """测试 get() + impersonate + Workers 代理""" - print("\n" + "="*60) - print("测试 2: cfspider.get() + impersonate + cf_proxies") - print("="*60) - - try: - response = cfspider.get( - "https://httpbin.org/ip", - impersonate="chrome131", - cf_proxies=CF_WORKERS - ) - print(f"状态码: {response.status_code}") - print(f"CF Colo: {response.cf_colo}") - print(f"响应: {response.text}") - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - - -def test_post_impersonate(): - """测试 post() + impersonate""" - print("\n" + "="*60) - print("测试 3: cfspider.post() + impersonate='safari18_0'") - print("="*60) - - try: - response = cfspider.post( - "https://httpbin.org/post", - impersonate="safari18_0", - json={"test": "data"} - ) - print(f"状态码: {response.status_code}") - data = response.json() - print(f"POST 数据: {data.get('json')}") - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - - -def test_different_browsers(): - """测试不同浏览器指纹""" - print("\n" + "="*60) - print("测试 4: 不同浏览器指纹对比") - print("="*60) - - browsers = ["chrome131", "safari18_0", "firefox133"] - - try: - for browser in browsers: - response = cfspider.get( - "https://tls.browserleaks.com/json", - impersonate=browser - ) - data = response.json() - print(f"{browser}: JA3={data.get('ja3_hash', 'N/A')[:16]}...") - - print("✓ 测试通过") - return True - except Exception as e: - print(f"✗ 测试失败: {e}") - return False - - -def main(): - print("="*60) - print("cfspider.get() impersonate 参数测试") - print("="*60) - - results = [] - - results.append(test_get_impersonate()) - results.append(test_get_impersonate_workers()) - results.append(test_post_impersonate()) - results.append(test_different_browsers()) - - # 结果汇总 - print("\n" + "="*60) - print("测试结果汇总") - print("="*60) - - tests = [ - "get() + impersonate", - "get() + impersonate + Workers", - "post() + impersonate", - "不同浏览器指纹" - ] - - passed = sum(results) - failed = len(results) - passed - - for i, (test, result) in enumerate(zip(tests, results)): - status = "✓ 通过" if result else "✗ 失败" - print(f"{i+1}. {test}: {status}") - - print(f"\n总计: {passed} 通过, {failed} 失败") - - -if __name__ == "__main__": - main() -