#!/usr/bin/env python3 """ 测试 Webshare SOCKS5 代理连通性 """ import requests import time import threading from concurrent.futures import ThreadPoolExecutor, as_completed import sys # 从 proxies_socks5.txt 读取的代理列表 WEBSHARE_PROXIES = [ "socks5://fbkjstyt:lvo4zphp2wwj@45.196.40.191:6269", "socks5://fbkjstyt:lvo4zphp2wwj@130.180.228.168:6452", "socks5://fbkjstyt:lvo4zphp2wwj@72.1.154.35:7926", "socks5://fbkjstyt:lvo4zphp2wwj@63.141.62.186:6479", "socks5://fbkjstyt:lvo4zphp2wwj@216.170.122.181:6219", "socks5://fbkjstyt:lvo4zphp2wwj@192.53.67.209:5758", "socks5://fbkjstyt:lvo4zphp2wwj@130.180.231.18:8160", "socks5://fbkjstyt:lvo4zphp2wwj@192.53.142.239:5936", "socks5://fbkjstyt:lvo4zphp2wwj@103.130.178.22:5686", "socks5://fbkjstyt:lvo4zphp2wwj@216.98.254.253:6563", "socks5://fbkjstyt:lvo4zphp2wwj@192.46.188.237:5896", "socks5://fbkjstyt:lvo4zphp2wwj@45.56.161.56:8932", "socks5://fbkjstyt:lvo4zphp2wwj@192.46.201.252:6766", "socks5://fbkjstyt:lvo4zphp2wwj@45.196.50.62:6384", "socks5://fbkjstyt:lvo4zphp2wwj@193.160.83.42:6363", "socks5://fbkjstyt:lvo4zphp2wwj@103.130.178.234:5898", "socks5://fbkjstyt:lvo4zphp2wwj@72.46.139.62:6622", "socks5://fbkjstyt:lvo4zphp2wwj@72.46.139.239:6799", "socks5://fbkjstyt:lvo4zphp2wwj@103.130.178.62:5726", "socks5://fbkjstyt:lvo4zphp2wwj@72.46.138.21:6247" ] def test_single_proxy(index, proxy_url): """测试单个代理""" try: print(f"[{index+1:2d}] 测试 {proxy_url} ...", end=" ", flush=True) # 设置代理 proxies = { 'http': proxy_url, 'https': proxy_url } # 测试连接 start_time = time.time() response = requests.get( 'http://httpbin.org/ip', proxies=proxies, timeout=15, verify=False ) if response.status_code == 200: data = response.json() ip = data.get('origin', 'Unknown') elapsed = time.time() - start_time print(f"✓ {ip} ({elapsed:.2f}s)") return index, True, ip, elapsed, None else: print(f"✗ HTTP {response.status_code}") return index, False, None, 0, f"HTTP {response.status_code}" except requests.exceptions.Timeout: print("✗ 超时") return index, False, None, 0, "Timeout" except requests.exceptions.ProxyError as e: print(f"✗ 代理错误: {str(e)[:50]}") return index, False, None, 0, "Proxy Error" except Exception as e: print(f"✗ 错误: {str(e)[:50]}") return index, False, None, 0, str(e)[:50] def test_all_proxies(max_workers=10): """测试所有代理""" print(f"开始测试 {len(WEBSHARE_PROXIES)} 个 Webshare SOCKS5 代理...") print("=" * 80) results = [] working_proxies = [] failed_proxies = [] # 使用线程池并发测试 with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务 futures = { executor.submit(test_single_proxy, i, proxy): i for i, proxy in enumerate(WEBSHARE_PROXIES) } # 收集结果 for future in as_completed(futures): try: result = future.result() results.append(result) index, success, ip, elapsed, error = result if success: working_proxies.append((index, WEBSHARE_PROXIES[index], ip, elapsed)) else: failed_proxies.append((index, WEBSHARE_PROXIES[index], error)) except Exception as e: print(f"任务执行错误: {e}") # 按索引排序结果 results.sort(key=lambda x: x[0]) working_proxies.sort(key=lambda x: x[0]) failed_proxies.sort(key=lambda x: x[0]) print("=" * 80) print(f"测试完成!") print(f"✓ 工作正常: {len(working_proxies)} 个") print(f"✗ 连接失败: {len(failed_proxies)} 个") if working_proxies: print(f"\n可用代理列表:") for index, proxy, ip, elapsed in working_proxies: print(f" [{index+1:2d}] {ip} ({elapsed:.2f}s) - {proxy}") if failed_proxies: print(f"\n失败代理列表:") for index, proxy, error in failed_proxies: print(f" [{index+1:2d}] {error} - {proxy}") # 速度统计 if working_proxies: speeds = [elapsed for _, _, _, elapsed in working_proxies] avg_speed = sum(speeds) / len(speeds) print(f"\n平均响应时间: {avg_speed:.2f}s") print(f"最快代理: {min(speeds):.2f}s") print(f"最慢代理: {max(speeds):.2f}s") return working_proxies, failed_proxies def test_specific_proxy(index): """测试指定索引的代理""" if 0 <= index < len(WEBSHARE_PROXIES): result = test_single_proxy(index, WEBSHARE_PROXIES[index]) return result else: print(f"错误: 代理索引 {index} 超出范围 (0-{len(WEBSHARE_PROXIES)-1})") return None def main(): if len(sys.argv) > 1: try: # 测试指定的代理 index = int(sys.argv[1]) - 1 # 用户输入从1开始 print(f"测试第 {sys.argv[1]} 个代理...") test_specific_proxy(index) except ValueError: if sys.argv[1] == "fast": print("快速测试前5个代理...") for i in range(min(5, len(WEBSHARE_PROXIES))): test_single_proxy(i, WEBSHARE_PROXIES[i]) elif sys.argv[1] == "working": print("只显示工作中的代理...") working, failed = test_all_proxies() if working: print(f"\n生成工作代理列表文件...") with open('working_proxies.txt', 'w') as f: for index, proxy, ip, elapsed in working: f.write(f"{proxy}\n") print(f"已保存到 working_proxies.txt") else: print("使用方法:") print(" python3 test-webshare-proxies.py # 测试所有代理") print(" python3 test-webshare-proxies.py 1-20 # 测试指定代理") print(" python3 test-webshare-proxies.py fast # 快速测试前5个") print(" python3 test-webshare-proxies.py working # 只保存工作的代理") else: # 测试所有代理 test_all_proxies() if __name__ == "__main__": main()