webshare/test-webshare-proxies.py

179 lines
6.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""
测试 Webshare SOCKS5 代理连通性
"""
import requests
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys
# 从 proxies_socks5.txt 读取的代理列表
WEBSHARE_PROXIES = [
"socks5://fbkjstyt:lvo4zphp2wwj@45.196.40.191:6269",
"socks5://fbkjstyt:lvo4zphp2wwj@130.180.228.168:6452",
"socks5://fbkjstyt:lvo4zphp2wwj@72.1.154.35:7926",
"socks5://fbkjstyt:lvo4zphp2wwj@63.141.62.186:6479",
"socks5://fbkjstyt:lvo4zphp2wwj@216.170.122.181:6219",
"socks5://fbkjstyt:lvo4zphp2wwj@192.53.67.209:5758",
"socks5://fbkjstyt:lvo4zphp2wwj@130.180.231.18:8160",
"socks5://fbkjstyt:lvo4zphp2wwj@192.53.142.239:5936",
"socks5://fbkjstyt:lvo4zphp2wwj@103.130.178.22:5686",
"socks5://fbkjstyt:lvo4zphp2wwj@216.98.254.253:6563",
"socks5://fbkjstyt:lvo4zphp2wwj@192.46.188.237:5896",
"socks5://fbkjstyt:lvo4zphp2wwj@45.56.161.56:8932",
"socks5://fbkjstyt:lvo4zphp2wwj@192.46.201.252:6766",
"socks5://fbkjstyt:lvo4zphp2wwj@45.196.50.62:6384",
"socks5://fbkjstyt:lvo4zphp2wwj@193.160.83.42:6363",
"socks5://fbkjstyt:lvo4zphp2wwj@103.130.178.234:5898",
"socks5://fbkjstyt:lvo4zphp2wwj@72.46.139.62:6622",
"socks5://fbkjstyt:lvo4zphp2wwj@72.46.139.239:6799",
"socks5://fbkjstyt:lvo4zphp2wwj@103.130.178.62:5726",
"socks5://fbkjstyt:lvo4zphp2wwj@72.46.138.21:6247"
]
def test_single_proxy(index, proxy_url):
"""测试单个代理"""
try:
print(f"[{index+1:2d}] 测试 {proxy_url} ...", end=" ", flush=True)
# 设置代理
proxies = {
'http': proxy_url,
'https': proxy_url
}
# 测试连接
start_time = time.time()
response = requests.get(
'http://httpbin.org/ip',
proxies=proxies,
timeout=15,
verify=False
)
if response.status_code == 200:
data = response.json()
ip = data.get('origin', 'Unknown')
elapsed = time.time() - start_time
print(f"{ip} ({elapsed:.2f}s)")
return index, True, ip, elapsed, None
else:
print(f"✗ HTTP {response.status_code}")
return index, False, None, 0, f"HTTP {response.status_code}"
except requests.exceptions.Timeout:
print("✗ 超时")
return index, False, None, 0, "Timeout"
except requests.exceptions.ProxyError as e:
print(f"✗ 代理错误: {str(e)[:50]}")
return index, False, None, 0, "Proxy Error"
except Exception as e:
print(f"✗ 错误: {str(e)[:50]}")
return index, False, None, 0, str(e)[:50]
def test_all_proxies(max_workers=10):
"""测试所有代理"""
print(f"开始测试 {len(WEBSHARE_PROXIES)} 个 Webshare SOCKS5 代理...")
print("=" * 80)
results = []
working_proxies = []
failed_proxies = []
# 使用线程池并发测试
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
futures = {
executor.submit(test_single_proxy, i, proxy): i
for i, proxy in enumerate(WEBSHARE_PROXIES)
}
# 收集结果
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
index, success, ip, elapsed, error = result
if success:
working_proxies.append((index, WEBSHARE_PROXIES[index], ip, elapsed))
else:
failed_proxies.append((index, WEBSHARE_PROXIES[index], error))
except Exception as e:
print(f"任务执行错误: {e}")
# 按索引排序结果
results.sort(key=lambda x: x[0])
working_proxies.sort(key=lambda x: x[0])
failed_proxies.sort(key=lambda x: x[0])
print("=" * 80)
print(f"测试完成!")
print(f"✓ 工作正常: {len(working_proxies)}")
print(f"✗ 连接失败: {len(failed_proxies)}")
if working_proxies:
print(f"\n可用代理列表:")
for index, proxy, ip, elapsed in working_proxies:
print(f" [{index+1:2d}] {ip} ({elapsed:.2f}s) - {proxy}")
if failed_proxies:
print(f"\n失败代理列表:")
for index, proxy, error in failed_proxies:
print(f" [{index+1:2d}] {error} - {proxy}")
# 速度统计
if working_proxies:
speeds = [elapsed for _, _, _, elapsed in working_proxies]
avg_speed = sum(speeds) / len(speeds)
print(f"\n平均响应时间: {avg_speed:.2f}s")
print(f"最快代理: {min(speeds):.2f}s")
print(f"最慢代理: {max(speeds):.2f}s")
return working_proxies, failed_proxies
def test_specific_proxy(index):
"""测试指定索引的代理"""
if 0 <= index < len(WEBSHARE_PROXIES):
result = test_single_proxy(index, WEBSHARE_PROXIES[index])
return result
else:
print(f"错误: 代理索引 {index} 超出范围 (0-{len(WEBSHARE_PROXIES)-1})")
return None
def main():
if len(sys.argv) > 1:
try:
# 测试指定的代理
index = int(sys.argv[1]) - 1 # 用户输入从1开始
print(f"测试第 {sys.argv[1]} 个代理...")
test_specific_proxy(index)
except ValueError:
if sys.argv[1] == "fast":
print("快速测试前5个代理...")
for i in range(min(5, len(WEBSHARE_PROXIES))):
test_single_proxy(i, WEBSHARE_PROXIES[i])
elif sys.argv[1] == "working":
print("只显示工作中的代理...")
working, failed = test_all_proxies()
if working:
print(f"\n生成工作代理列表文件...")
with open('working_proxies.txt', 'w') as f:
for index, proxy, ip, elapsed in working:
f.write(f"{proxy}\n")
print(f"已保存到 working_proxies.txt")
else:
print("使用方法:")
print(" python3 test-webshare-proxies.py # 测试所有代理")
print(" python3 test-webshare-proxies.py 1-20 # 测试指定代理")
print(" python3 test-webshare-proxies.py fast # 快速测试前5个")
print(" python3 test-webshare-proxies.py working # 只保存工作的代理")
else:
# 测试所有代理
test_all_proxies()
if __name__ == "__main__":
main()