I am trying to use aiohttp
to make asynchronous HTTP requests over multiple SOCKS proxies. Basically, I am creating a pool of Tor clients with different IP addresses, and want to be able to route HTTP requests through them using aiohttp
.
Based on the suggestions here and here, I have been trying to use aiosocks, but the examples in those threads do not work (if they ever did) because they are based on an old version of aiosocks
with a different API. Documentation and examples of using aiosocks
online are very sparse (it doesn't seem widely used). But I haven't been able to find any other solutions for using aiohttp
with SOCKS proxies.
Below is the code I have so far (sorry for the large amount of code - I tried to slim down the example as much as I could!). First I initialize the Tor clients with stem
:
from datetime import datetime
import stem.process
from TorUtils import printCircuits, cleanShutdown
NUM_TOR_CLIENTS = 3
# create list of (source_port, control_port) tuples
tor_ports = [(str(9050 + i), str(9050 + NUM_TOR_CLIENTS + i)) for i in range(NUM_TOR_CLIENTS)]
# Every ISO 3166 country code except for {US} and {CA}
country_codes = '{AF}, {AX}, {AL}, {DZ}, {AS}, {AD}, {AO}, {AI}, {AQ}, {AG}, {AR}, {AM}, {AW}, {AU}, {AT}, {AZ}, {BS}, {BH}, {BD}, {BB}, {BY}, {BE}, {BZ}, {BJ}, {BM}, {BT}, {BO}, {BQ}, {BA}, {BW}, {BV}, {BR}, {IO}, {BN}, {BG}, {BF}, {BI}, {KH}, {CM}, {CV}, {KY}, {CF}, {TD}, {CL}, {CN}, {CX}, {CC}, {CO}, {KM}, {CG}, {CD}, {CK}, {CR}, {CI}, {HR}, {CU}, {CW}, {CY}, {CZ}, {DK}, {DJ}, {DM}, {DO}, {EC}, {EG}, {SV}, {GQ}, {ER}, {EE}, {ET}, {FK}, {FO}, {FJ}, {FI}, {FR}, {GF}, {PF}, {TF}, {GA}, {GM}, {GE}, {DE}, {GH}, {GI}, {GR}, {GL}, {GD}, {GP}, {GU}, {GT}, {GG}, {GN}, {GW}, {GY}, {HT}, {HM}, {VA}, {HN}, {HK}, {HU}, {IS}, {IN}, {ID}, {IR}, {IQ}, {IE}, {IM}, {IL}, {IT}, {JM}, {JP}, {JE}, {JO}, {KZ}, {KE}, {KI}, {KP}, {KR}, {KW}, {KG}, {LA}, {LV}, {LB}, {LS}, {LR}, {LY}, {LI}, {LT}, {LU}, {MO}, {MK}, {MG}, {MW}, {MY}, {MV}, {ML}, {MT}, {MH}, {MQ}, {MR}, {MU}, {YT}, {MX}, {FM}, {MD}, {MC}, {MN}, {ME}, {MS}, {MA}, {MZ}, {MM}, {NA}, {NR}, {NP}, {NL}, {NC}, {NZ}, {NI}, {NE}, {NG}, {NU}, {NF}, {MP}, {NO}, {OM}, {PK}, {PW}, {PS}, {PA}, {PG}, {PY}, {PE}, {PH}, {PN}, {PL}, {PT}, {PR}, {QA}, {RE}, {RO}, {RU}, {RW}, {BL}, {SH}, {KN}, {LC}, {MF}, {PM}, {VC}, {WS}, {SM}, {ST}, {SA}, {SN}, {RS}, {SC}, {SL}, {SG}, {SX}, {SK}, {SI}, {SB}, {SO}, {ZA}, {GS}, {SS}, {ES}, {LK}, {SD}, {SR}, {SJ}, {SZ}, {SE}, {CH}, {SY}, {TW}, {TJ}, {TZ}, {TH}, {TL}, {TG}, {TK}, {TO}, {TT}, {TN}, {TR}, {TM}, {TC}, {TV}, {UG}, {UA}, {AE}, {GB}, {UM}, {UY}, {UZ}, {VU}, {VE}, {VN}, {VG}, {VI}, {WF}, {EH}, {YE}, {ZM}, {ZW}'
tor_configs = [{'SOCKSPort': p[0], 'ControlPort': p[1], 'DataDirectory': './.tordata' + p[0],
'CookieAuthentication' : '1', 'MaxCircuitDirtiness': '3600', 'ExcludeNodes': country_codes,
'EntryNodes': '{us}, {ca}', 'ExitNodes': '{us}, {ca}', 'StrictNodes': '1',
'GeoIPExcludeUnknown': '1', 'EnforceDistinctSubnets': '0'
} for p in tor_ports]
print(f"Spawning {NUM_TOR_CLIENTS} tor clients ...")
start_time = datetime.now()
tor_clients = []
for cfg in tor_configs:
tor_clients.append({'config': cfg, 'process': stem.process.launch_tor_with_config(config = cfg)})
... and then I am trying to use the following code to make the HTTP requests with aiohttp
:
from collections import defaultdict, deque
from datetime import datetime, timedelta
import asyncio
import aiohttp
import aiosocks
from aiosocks.connector import ProxyConnector, ProxyClientRequest
import async_timeout
TIMEOUT = 10
async def _get(url, session, proxy, request_limiter):
try:
async with request_limiter: # semaphore to limit number of concurrent requests
async with async_timeout.timeout(TIMEOUT):
async with session.get(url, proxy=proxy, proxy_auth=None) as resp:
status = int(resp.status)
headers = dict(resp.headers)
content_type = str(resp.content_type)
text = await resp.text()
return {'url': url, 'status': status, 'headers': headers, 'text': str(text), 'errors': None}
except asyncio.TimeoutError as e:
queue.visited_urls[url] = datetime.now()
return {'url': url, 'status': None, 'headers': None, 'text': None, 'errors': str(e)}
async def _getPagesTasks(url_list, tor_clients, request_limiter, loop):
"""Launch requests for all web pages."""
#deque rotates continuously through SOCKS sessions for each tor client ...
sessions = deque()
for tor_client in tor_clients:
conn = ProxyConnector()
session = aiohttp.ClientSession(connector=conn, request_class=ProxyClientRequest)
sessions.append({'proxy': 'http://127.0.0.1:' + tor_client['config']['SOCKSPort'], 'session': session})
tasks = []
task_count = 0
for url in url_list:
s = sessions.popleft();
session = s['session']
proxy = s['proxy']
task = loop.create_task(_get(url, session, proxy, request_limiter))
tasks.append(task)
task_count += 1
session.append(s)
results = await asyncio.gather(*tasks)
for s in sessions:
s.close()
return results
def getPages(url_list, tor_clients):
"""Given a URL list, dispatch pool of tor clients to concurrently fetch URLs"""
request_limiter = asyncio.Semaphore(len(tor_clients)) # limit to one request per client at a time
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
responses = loop.run_until_complete(_getPagesTasks(url_list, tor_clients, request_limiter, loop))
loop.close()
return responses
This code is not running, however. When I try to run it, I get the error below. I'm wondering if I'm doing something wrong, or if this is some problem with aiosocks
(which seems like it's been unmaintained for a while, and might be targetting an older version of aiohttp
or something ...):
~/Code/gis project/code/TorGetQueue.py in _getPagesTasks(url_list, tor_clients, request_limiter, loop)
50 sessions = deque()
51 for client in tor_clients:
---> 52 conn = ProxyConnector()
53 session = aiohttp.ClientSession(connector=conn, request_class=ProxyClientRequest)
54 sessions.append({'proxy': 'http://127.0.0.1:' + client['config']['SOCKSPort'], 'session': session})
~/.local/share/virtualenvs/code-pIyQci_2/lib/python3.6/site-packages/aiosocks/connector.py in __init__(self, verify_ssl, fingerprint, resolve, use_dns_cache, family, ssl_context, local_addr, resolver, keepalive_timeout, force_close, limit, limit_per_host, enable_cleanup_closed, loop, remote_resolve)
54 force_close=force_close, limit=limit, loop=loop,
55 limit_per_host=limit_per_host, use_dns_cache=use_dns_cache,
---> 56 enable_cleanup_closed=enable_cleanup_closed)
57
58 self._remote_resolve = remote_resolve
TypeError: __init__() got an unexpected keyword argument 'resolve'
What am I doing wrong here? Is there an easier way to use SOCKS proxies with aiohttp
? What do I need to change to make this code work with aiosocks
?
Thanks!