179 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
			
		
		
	
	
			179 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
| #!/usr/bin/env python3
 | |
| """
 | |
| 测试 Webshare SOCKS5 代理连通性
 | |
| """
 | |
| 
 | |
| import requests
 | |
| import time
 | |
| import threading
 | |
| from concurrent.futures import ThreadPoolExecutor, as_completed
 | |
| import sys
 | |
| 
 | |
| # 从 proxies_socks5.txt 读取的代理列表
 | |
| WEBSHARE_PROXIES = [
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@45.196.40.191:6269",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@130.180.228.168:6452",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@72.1.154.35:7926",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@63.141.62.186:6479",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@216.170.122.181:6219",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@192.53.67.209:5758",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@130.180.231.18:8160",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@192.53.142.239:5936",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@103.130.178.22:5686",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@216.98.254.253:6563",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@192.46.188.237:5896",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@45.56.161.56:8932",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@192.46.201.252:6766",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@45.196.50.62:6384",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@193.160.83.42:6363",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@103.130.178.234:5898",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@72.46.139.62:6622",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@72.46.139.239:6799",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@103.130.178.62:5726",
 | |
|     "socks5://fbkjstyt:lvo4zphp2wwj@72.46.138.21:6247"
 | |
| ]
 | |
| 
 | |
| def test_single_proxy(index, proxy_url):
 | |
|     """测试单个代理"""
 | |
|     try:
 | |
|         print(f"[{index+1:2d}] 测试 {proxy_url} ...", end=" ", flush=True)
 | |
|         
 | |
|         # 设置代理
 | |
|         proxies = {
 | |
|             'http': proxy_url,
 | |
|             'https': proxy_url
 | |
|         }
 | |
|         
 | |
|         # 测试连接
 | |
|         start_time = time.time()
 | |
|         response = requests.get(
 | |
|             'http://httpbin.org/ip',
 | |
|             proxies=proxies,
 | |
|             timeout=15,
 | |
|             verify=False
 | |
|         )
 | |
|         
 | |
|         if response.status_code == 200:
 | |
|             data = response.json()
 | |
|             ip = data.get('origin', 'Unknown')
 | |
|             elapsed = time.time() - start_time
 | |
|             print(f"✓ {ip} ({elapsed:.2f}s)")
 | |
|             return index, True, ip, elapsed, None
 | |
|         else:
 | |
|             print(f"✗ HTTP {response.status_code}")
 | |
|             return index, False, None, 0, f"HTTP {response.status_code}"
 | |
|             
 | |
|     except requests.exceptions.Timeout:
 | |
|         print("✗ 超时")
 | |
|         return index, False, None, 0, "Timeout"
 | |
|     except requests.exceptions.ProxyError as e:
 | |
|         print(f"✗ 代理错误: {str(e)[:50]}")
 | |
|         return index, False, None, 0, "Proxy Error"
 | |
|     except Exception as e:
 | |
|         print(f"✗ 错误: {str(e)[:50]}")
 | |
|         return index, False, None, 0, str(e)[:50]
 | |
| 
 | |
| def test_all_proxies(max_workers=10):
 | |
|     """测试所有代理"""
 | |
|     print(f"开始测试 {len(WEBSHARE_PROXIES)} 个 Webshare SOCKS5 代理...")
 | |
|     print("=" * 80)
 | |
|     
 | |
|     results = []
 | |
|     working_proxies = []
 | |
|     failed_proxies = []
 | |
|     
 | |
|     # 使用线程池并发测试
 | |
|     with ThreadPoolExecutor(max_workers=max_workers) as executor:
 | |
|         # 提交所有任务
 | |
|         futures = {
 | |
|             executor.submit(test_single_proxy, i, proxy): i 
 | |
|             for i, proxy in enumerate(WEBSHARE_PROXIES)
 | |
|         }
 | |
|         
 | |
|         # 收集结果
 | |
|         for future in as_completed(futures):
 | |
|             try:
 | |
|                 result = future.result()
 | |
|                 results.append(result)
 | |
|                 
 | |
|                 index, success, ip, elapsed, error = result
 | |
|                 if success:
 | |
|                     working_proxies.append((index, WEBSHARE_PROXIES[index], ip, elapsed))
 | |
|                 else:
 | |
|                     failed_proxies.append((index, WEBSHARE_PROXIES[index], error))
 | |
|                     
 | |
|             except Exception as e:
 | |
|                 print(f"任务执行错误: {e}")
 | |
|     
 | |
|     # 按索引排序结果
 | |
|     results.sort(key=lambda x: x[0])
 | |
|     working_proxies.sort(key=lambda x: x[0])
 | |
|     failed_proxies.sort(key=lambda x: x[0])
 | |
|     
 | |
|     print("=" * 80)
 | |
|     print(f"测试完成!")
 | |
|     print(f"✓ 工作正常: {len(working_proxies)} 个")
 | |
|     print(f"✗ 连接失败: {len(failed_proxies)} 个")
 | |
|     
 | |
|     if working_proxies:
 | |
|         print(f"\n可用代理列表:")
 | |
|         for index, proxy, ip, elapsed in working_proxies:
 | |
|             print(f"  [{index+1:2d}] {ip} ({elapsed:.2f}s) - {proxy}")
 | |
|     
 | |
|     if failed_proxies:
 | |
|         print(f"\n失败代理列表:")
 | |
|         for index, proxy, error in failed_proxies:
 | |
|             print(f"  [{index+1:2d}] {error} - {proxy}")
 | |
|     
 | |
|     # 速度统计
 | |
|     if working_proxies:
 | |
|         speeds = [elapsed for _, _, _, elapsed in working_proxies]
 | |
|         avg_speed = sum(speeds) / len(speeds)
 | |
|         print(f"\n平均响应时间: {avg_speed:.2f}s")
 | |
|         print(f"最快代理: {min(speeds):.2f}s")
 | |
|         print(f"最慢代理: {max(speeds):.2f}s")
 | |
|     
 | |
|     return working_proxies, failed_proxies
 | |
| 
 | |
| def test_specific_proxy(index):
 | |
|     """测试指定索引的代理"""
 | |
|     if 0 <= index < len(WEBSHARE_PROXIES):
 | |
|         result = test_single_proxy(index, WEBSHARE_PROXIES[index])
 | |
|         return result
 | |
|     else:
 | |
|         print(f"错误: 代理索引 {index} 超出范围 (0-{len(WEBSHARE_PROXIES)-1})")
 | |
|         return None
 | |
| 
 | |
| def main():
 | |
|     if len(sys.argv) > 1:
 | |
|         try:
 | |
|             # 测试指定的代理
 | |
|             index = int(sys.argv[1]) - 1  # 用户输入从1开始
 | |
|             print(f"测试第 {sys.argv[1]} 个代理...")
 | |
|             test_specific_proxy(index)
 | |
|         except ValueError:
 | |
|             if sys.argv[1] == "fast":
 | |
|                 print("快速测试前5个代理...")
 | |
|                 for i in range(min(5, len(WEBSHARE_PROXIES))):
 | |
|                     test_single_proxy(i, WEBSHARE_PROXIES[i])
 | |
|             elif sys.argv[1] == "working":
 | |
|                 print("只显示工作中的代理...")
 | |
|                 working, failed = test_all_proxies()
 | |
|                 if working:
 | |
|                     print(f"\n生成工作代理列表文件...")
 | |
|                     with open('working_proxies.txt', 'w') as f:
 | |
|                         for index, proxy, ip, elapsed in working:
 | |
|                             f.write(f"{proxy}\n")
 | |
|                     print(f"已保存到 working_proxies.txt")
 | |
|             else:
 | |
|                 print("使用方法:")
 | |
|                 print("  python3 test-webshare-proxies.py           # 测试所有代理")
 | |
|                 print("  python3 test-webshare-proxies.py 1-20      # 测试指定代理")
 | |
|                 print("  python3 test-webshare-proxies.py fast      # 快速测试前5个")
 | |
|                 print("  python3 test-webshare-proxies.py working   # 只保存工作的代理")
 | |
|     else:
 | |
|         # 测试所有代理
 | |
|         test_all_proxies()
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main() |