What is the fastest way to send 100,000 HTTP requests in Python?
Asked Answered
B

21

386

I am opening a file which has 100,000 URL's. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible - I suppose that means 'concurrently'.

Bowrah answered 13/4, 2010 at 19:19 Comment(8)
Make sure that you only do HEAD request (so that you don't download the whole document). See: #107905Enure
Excellent point, Kalmi. If all Igor wants is the status of the request, these 100K requests will go much, much, much quicker. Much quicker.Diaconicum
Yes, Kalmi is right. And in fact I am doing HEAD requests with httplib, and it does speed things up.Bowrah
You don't need threads for this; the most efficient way is likely to use an asynchronous library like Twisted.Shastashastra
jemfinch: I wonder how long it would take Twisted to return me 100,000 HTTP response codes. I haven't tried it yet, have you? With threads I have been able to complete my objective in under 10 minutes.Bowrah
here are gevent, twisted, and asyncio -based code examples (tested on 1000000 requests)Tilly
@TarnayKálmán its possible for requests.get and requests.head (i.e. a page request vs a head request) to return different status codes, so this is not the best adviceAlto
2017 update. Use Python 3.5+ and asyncioTriboelectricity
B
231

Twistedless solution:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

This one is slighty faster than the twisted solution and uses less CPU.

Bedevil answered 14/4, 2010 at 5:22 Comment(19)
Kalmi, your solution is pretty good. The timed results on running your program with a file of 100,000 url's was: real 5m23.863s user 1m28.177s sys 2m34.299s However, one question I have is: isn't populating the queue with each url reduntant and adds overhead? Why not just spawn the processes from the url's as you are reading them from the file (without using a queue)?Bowrah
Well... This is basically a simple threadpool implementation. It ensures that there are no more than 200 jobs running at the same time. And I know no way to implement a threadpool without using something queuelike. And yes, you do need a threadpool. I'm pretty sure you want to be able to control the number of requests that can happen at the same time.Enure
Kalmi: I wrote up a Python script similar to yours that contains the following: if threading.active_count() > 200: time.sleep(10) This allows outstanding threads to catch up so that the program doesn't crash. So no queue was needed in this implementation.Bowrah
I believe that using a Queue is less overhead than starting threads. And your solution could stand idling for x amount of time if all the outstanding request finish before the sleep is over.Enure
Um, why did it crash in the first place?Enure
@Kalmi, why do you set Queue to concurrent*2 ?Boesch
Well, I'm not exactly sure why it did that. It probably matters very little (if at all). I guess, all wanted was for it to never be empty. The populating thread could be waiting for IO or it might not get scheduled for quite some time. Who knows what might happen under heavy load (with many threads)... I just wanted to be on the safe side .Enure
Don't forget to close the connection conn.close(). Opening too many http connections may halt your script at some point and eats memory.Solubility
Should it be "from queue import Queue" instead of "from Queue import Queue"?Vertex
@hyh, the Queue module has been renamed to queue in Python 3. This is Python 2 code.Enure
How much faster can you go if you want to talk with the SAME server each time, by persisting the connection? Can this even be done across threads, or with one persistent connection per thread?Wivina
I have experienced problems using httplib to send concurrent requests. Sometimes some requests didn't arrive to the server, but it depends on the server and on the client that I use ... so I suspect that it is a send time and response time matter.Harrell
How do you end the program when all the url in 'urllist.txt' have been iterated through?Merete
@Nyxynyx, q.join() blocks till the queue gets empty, and there is nothing after that, so it exits. All the threads are daemon threads, so they won't prevent it.Enure
@TarnayKálmán if I wanted to save the response codes (e.g. to a list) how would I introduce that to the code?Illuminati
@mptevsion, if you are using CPython, you could (for example) just replace "print status, url" with "my_global_list.append((status, url))". (Most operations on) lists are implicitly thread-safe in CPython (and some other python implementations) due to the GIL, so this is safe to do.Enure
You can do without queue by reading a line from file inside doWork() but I guess that would require some sync (mutex?)Acro
Yeah, you could do that, i haven't thought of that, but I really doubt you would win anything by doing that.Enure
You can find the same python3 code here... gist.github.com/im-noob/c9d78449f025f503bfbbdabef2f42c96Ingmar
V
99

Things have changed quite a bit since 2010 when this was posted and I haven't tried all the other answers but I have tried a few, and I found this to work the best for me using python3.6.

I was able to fetch about ~150 unique domains per second running on AWS.

import concurrent.futures
import requests
import time

out = []
CONNECTIONS = 100
TIMEOUT = 5

tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]

def load_url(url, timeout):
    ans = requests.head(url, timeout=timeout)
    return ans.status_code

with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
    future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
    time1 = time.time()
    for future in concurrent.futures.as_completed(future_to_url):
        try:
            data = future.result()
        except Exception as exc:
            data = str(type(exc))
        finally:
            out.append(data)

            print(str(len(out)),end="\r")

    time2 = time.time()

print(f'Took {time2-time1:.2f} s')
Vegetarian answered 10/9, 2017 at 19:13 Comment(7)
I'm only asking because I don't know but could this futures stuff be replaced with async/await?Sukey
It could, but I have found the above to work better. you could use aiohttp but its not part of the standard lib and is changing quite a lot. It does work but I just havn't found it to work as well. I get higher error rates when I use it and for the life of me I can't get it to work as well as concurrent futures although in theory It seems that it should work better, see: #45801357 if you get it to work well please post your answer so I can test it.Vegetarian
This is a nitpick, but I think it's a lot cleaner to put time1 = time.time() at the top of the for loop and time2 = time.time() right after the for loop.Hattiehatton
I tested your snippet, somehow it executes twice. Am I doing something wrong? Or is it meant to run twice? If its the latter case, can you also help me to understand how does it trigger twice?Manicure
It shouldn't run twice. Not sure why you are seeing that.Vegetarian
Hey @GlenThompson can you tell me what is your AWS server configuration?Leeannaleeanne
@UjjwalSinghBaghel thats a pretty broad question. I was running inside a docker container. The only things I would say is that you might need to consider the cost benefit of different sizes of EC2 machines. Also ulimits in the context of docker. My contact information is in my profile if you want to reach out.Vegetarian
W
70

I know this is an old question, but in Python 3.7 you can do this using asyncio and aiohttp.

import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError

async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
    try:
        resp = await session.request(method="GET", url=url, **kwargs)
    except ClientConnectorError:
        return (url, 404)
    return (url, resp.status)

async def make_requests(urls: set, **kwargs) -> None:
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                fetch_html(url=url, session=session, **kwargs)
            )
        results = await asyncio.gather(*tasks)

    for result in results:
        print(f'{result[1]} - {str(result[0])}')

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    asyncio.run(make_requests(urls=urls))

You can read more about it and see an example here.

Warrigal answered 28/8, 2019 at 9:25 Comment(9)
Is this similar to C# async/await and Kotlin Coroutines?Bowrah
@IgorGanapolsky, yes, it's very similar to C# async/await. I'm not familiar with Kotlin Coroutines.Eastereasterday
@sandyp, I am not sure if it works, but if you want to try you will have to use the UnixConnector for aiohttp. Read more here: docs.aiohttp.org/en/stable/client_reference.html#connectors.Eastereasterday
Thanks @MariusStănescu. That is exactly what I used.Obbligato
+1 for showing asyncio.gather(*tasks). here is one such snippet that i used: urls= [fetch(construct_fetch_url(u),idx) for idx, u in enumerate(some_URI_list)] results = await asyncio.gather(*urls)Presentable
Not sure what I'm doing wrong but I cannot get an output from this; runs but blank terminal. None of the print statements are working. I went through the basic debugging steps like checking if the URLs are correctly fed in, if main() is executed etc. Any idea what it could be?Ejector
I'd like to use this to test a site that returns data based on IDs in url params, i'd be querying about 3000 IDs. Could this cause a DDOS? And therefore add I should a sleep? By the way it's probably a small server, no fancy load balancer. Please advise.Jenellejenesia
To be on the safe side I would make the requests in batches of 100Eastereasterday
An improvement that increases performance a lot is change for url in urls: for list comprehension: tasks = [fetch_html(url=url, session=session, **kwargs) for url in urls]Intramolecular
A
61

A solution using tornado asynchronous networking library

from tornado import ioloop, httpclient

i = 0

def handle_request(response):
    print(response.code)
    global i
    i -= 1
    if i == 0:
        ioloop.IOLoop.instance().stop()

http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
    i += 1
    http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()

This code is using non-blocking network I/O and doesn't have any restriction. It can scale to tens of thousands of open connections. It will run in a single thread but will be a way faster then any threading solution. Checkout non-blocking I/O

Amoebic answered 28/8, 2014 at 13:11 Comment(9)
And these will run fully in parallel? Or is there some restriction on the number of threads?Bowrah
Can you explain what is happening here with the global i variable? Some sort of error checking?Fromm
It is a counter for determining when to exit the ``ioloop` -- so when you are done.Maillol
This is assuming that we do not want to set proxies?Triboelectricity
@AndrewScottEvans it assumed that you are using python 2.7 and proxiesHubbub
@Amoebic I tested your code but I get only 599 response code. Don't you know where can be a problem? ThanksPony
@Amoebic - If I am not interested at all in the response, meaning only wish to send as many requests as possible as fast as possible towards the server, what (if any) should I modify in the example above ? Thanks !!Surefire
@Guy Avraham Good luck getting help on your ddos plan.Quiescent
@Quiescent - you got me :) Actually I was trying to do some very naive "stress test"Surefire
B
47

Threads are absolutely not the answer here. They will provide both process and kernel bottlenecks, as well as throughput limits that are not acceptable if the overall goal is "the fastest way".

A little bit of twisted and its asynchronous HTTP client would give you much better results.

Bouldin answered 13/4, 2010 at 20:14 Comment(6)
ironfroggy: I am leaning toward your sentiments. I tried implementing my solution with threads and queues (for automatic mutexes), but can you imagine how long it takes to populate a queue with 100,000 things?? I'm still playing around with different options and suggestions by everyone on this thread, and maybe Twisted will be a good solution.Bowrah
You can avoid populating a queue with 100k things. Just process items one at a time from your input, then launch a thread to process the request corresponding to each item. (As I describe below, use a launcher thread to start the HTTP request threads when your thread count is below some threshold. Make the threads write the results out into a dict mapping URL to response, or append tuples to a list.)Furthermost
ironfroggy: Also, I'm curious about what bottlenecks you've found using Python threads? And how do Python threads interact with the OS kernel?Furthermost
Make sure you install the epoll reactor; otherwise you'll be using select/poll, and it will be very slow. Also, if you're going to actually try to have 100,000 connections open simultaneously (assuming your program is written that way, and the URLs are on different servers), you'll need to tune your OS so that you won't run out of file descriptors, ephemeral ports, etc. (it's probably easier to just make sure that you don't have more than, say, 10,000 outstanding connections at once).Primine
erikg: you did recommend a great idea. However, the best result I was able to achieve with 200 threads was approx. 6 minutes. I'm sure there are ways to accomplish this in lesser time... Mark N: if Twisted is the way I decide to go, then epoll reactor is surely useful. However, if my script will be run from multiple machines, wouldn't that necessitate the installation of Twisted on EACH machine? I don't know if I can convince my boss to go that route...Bowrah
@ErikGarrison think ironfroggy is referring at least in part to the GIL. async trumps threaded for tasks like this in python. threadpools or separate processes wrapped around async could be another method.Leverick
S
26

Use grequests , it's a combination of requests + Gevent module .

GRequests allows you to use Requests with Gevent to make asyncronous HTTP Requests easily.

Usage is simple:

import grequests

urls = [
   'http://www.heroku.com',
   'http://tablib.org',
   'http://httpbin.org',
   'http://python-requests.org',
   'http://kennethreitz.com'
]

Create a set of unsent Requests:

>>> rs = (grequests.get(u) for u in urls)

Send them all at the same time:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
Sociality answered 14/7, 2014 at 7:41 Comment(2)
gevent now supports python 3Gualterio
grequests is not part of normal requests and seems to be largely unmaintanedInquire
R
26

(Note to self for next project)

Python 3 solution using only requests. It's the simplest and it's fast, no need for multiprocessing or complicated asynchronous libraries.

The most important aspect is to reuse connections, especially for HTTPS (TLS requires an extra round trip to open). Note that a connection is specific to a subdomain. If you scrape many pages on many domains, you can sort the list of URLs to maximize connection reuse (it effectively sorts by domain).

It will be as fast as any asynchronous code, when given enough threads. (requests releases the python GIL when waiting for the response).

[Production grade code with some logging and error handling]

import logging
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

# source: https://mcmap.net/q/49135/-what-is-the-fastest-way-to-send-100-000-http-requests-in-python

THREAD_POOL = 16

# This is how to create a reusable connection pool with python requests.
session = requests.Session()
session.mount(
    'https://',
    requests.adapters.HTTPAdapter(pool_maxsize=THREAD_POOL,
                                  max_retries=3,
                                  pool_block=True)
)

def get(url):
    response = session.get(url)
    logging.info("request was completed in %s seconds [%s]", response.elapsed.total_seconds(), response.url)
    if response.status_code != 200:
        logging.error("request failed, error code %s [%s]", response.status_code, response.url)
    if 500 <= response.status_code < 600:
        # server is overloaded? give it a break
        time.sleep(5)
    return response

def download(urls):
    with ThreadPoolExecutor(max_workers=THREAD_POOL) as executor:
        # wrap in a list() to wait for all requests to complete
        for response in list(executor.map(get, urls)):
            if response.status_code == 200:
                print(response.content)

def main():
    logging.basicConfig(
        format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
        level=logging.INFO,
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    urls = [
        "https://httpstat.us/200",
        "https://httpstat.us/200",
        "https://httpstat.us/200",
        "https://httpstat.us/404",
        "https://httpstat.us/503"
    ]

    download(urls)

if __name__ == "__main__":
    main()
Relieve answered 29/7, 2021 at 22:10 Comment(5)
What do you mean by ~"sort URLs"?Bowrah
Sort the list of URLS sorted(urls)Relieve
if you start running into 400 errors "request/cookie too large"- work a session.cookies.clear() into your code.Imprimatur
"requests releases the python GIL when waiting for the response". How do you know that? Thank you.Jonas
How do you do something useful with the request? like where can you cast the response to json and read out values?Trope
F
11

A good approach to solving this problem is to first write the code required to get one result, then incorporate threading code to parallelize the application.

In a perfect world this would simply mean simultaneously starting 100,000 threads which output their results into a dictionary or list for later processing, but in practice you are limited in how many parallel HTTP requests you can issue in this fashion. Locally, you have limits in how many sockets you can open concurrently, how many threads of execution your Python interpreter will allow. Remotely, you may be limited in the number of simultaneous connections if all the requests are against one server, or many. These limitations will probably necessitate that you write the script in such a way as to only poll a small fraction of the URLs at any one time (100, as another poster mentioned, is probably a decent thread pool size, although you may find that you can successfully deploy many more).

You can follow this design pattern to resolve the above issue:

  1. Start a thread which launches new request threads until the number of currently running threads (you can track them via threading.active_count() or by pushing the thread objects into a data structure) is >= your maximum number of simultaneous requests (say 100), then sleeps for a short timeout. This thread should terminate when there is are no more URLs to process. Thus, the thread will keep waking up, launching new threads, and sleeping until your are finished.
  2. Have the request threads store their results in some data structure for later retrieval and output. If the structure you are storing the results in is a list or dict in CPython, you can safely append or insert unique items from your threads without locks, but if you write to a file or require in more complex cross-thread data interaction you should use a mutual exclusion lock to protect this state from corruption.

I would suggest you use the threading module. You can use it to launch and track running threads. Python's threading support is bare, but the description of your problem suggests that it is completely sufficient for your needs.

Finally, if you'd like to see a pretty straightforward application of a parallel network application written in Python, check out ssh.py. It's a small library which uses Python threading to parallelize many SSH connections. The design is close enough to your requirements that you may find it to be a good resource.

Furthermost answered 13/4, 2010 at 20:9 Comment(7)
erikg: would throwing in a queue into your equation be reasonable (for mutual-exclusion locking)? I suspect that Python's GIL isn't geared toward playing with thousands of threads.Bowrah
Why do you need mutual-exclusion locking to prevent the generation of too many threads? I suspect I misunderstand the term. You can track running threads in a thread queue, removing them when they complete and adding more up to said thread limit. But in a simple case such as the one in question you can also just watch the number of active threads in the current Python process, wait until it falls below a threshold, and launch more threads up to the threshold as described. I guess you could consider this an implicit lock, but no explicit locks are required afaik.Furthermost
erikg: don't multiple threads share state? On page 305 in O'Reilly's book "Python for Unix and Linux System Administration" it states: "... using threading without queues makes it more complex than many people can realistically handle. It is a much better idea to always use the queuing module if you find you need to use threads. Why? Because the queue module also alleviates the need to explicitly protect data with mutexes because the queue itself is already protected internally by a mutex." Again, I welcome your point of view on this.Bowrah
Igor: You are absolutely right that you should use a lock. I've edited the post to reflect this. That said, practical experience with python suggests that you don't need to lock data structures which you modify atomically from your threads, such as by list.append or by the addition of a hash key. The reason, I believe, is the GIL, which provides operations such as list.append with a degree of atomicity. I am currently running a test to verify this (use 10k threads to append numbers 0-9999 to a list, check that all appends worked). After nearly 100 iterations the test has not failed.Furthermost
Igor: I'm asked another question on this topic: #2740935Furthermost
Erik G: You are right about the GIL, it enforces atomicity and integrity of shared data. In my case, I am reading 100,000 url's from a file, spawning as many threads as possible to send http requests to those url's, and putting the response codes into a data structure (finally writing results to a file). So the only benefit I see to using a queue in my case it its join() method, which prevents the program from exiting before all queue items are processed. Is there another benefit to using a queue that I am not seeing?Bowrah
Igor: You can retain references to all the launched threads in a list, and then iterate over the threads, calling join() on each of them at the end of your program. (I provide an example in the other question I reference in these comments.) This will have the same effect. I think that the difference between the approaches in this case is mostly stylistic. I would find the non-queue case simpler and faster to write, but I might have more confidence in the long-term viability of the code I wrote using queues for thread management.Furthermost
T
9

If you're looking to get the best performance possible, you might want to consider using Asynchronous I/O rather than threads. The overhead associated with thousands of OS threads is non-trivial and the context switching within the Python interpreter adds even more on top of it. Threading will certainly get the job done but I suspect that an asynchronous route will provide better overall performance.

Specifically, I'd suggest the async web client in the Twisted library (http://www.twistedmatrix.com). It has an admittedly steep learning curve but it quite easy to use once you get a good handle on Twisted's style of asynchronous programming.

A HowTo on Twisted's asynchronous web client API is available at:

http://twistedmatrix.com/documents/current/web/howto/client.html

Talyah answered 13/4, 2010 at 20:12 Comment(2)
Rakis: I am currently looking into asynchronous and non-blocking I/O. I need to learn it better before I implement it. One comment I'd like to make on your post is that it is impossible (at least under my Linux distribution) to spawn "thousands of OS threads". There is a maximum number of threads that Python will allow you to spawn before the program breaks. And in my case (on CentOS 5) maximum number of threads is 303.Bowrah
That's good to know. I've never tried spawning more than a handful in Python at once but I would have expected to be able to create more than that before it bombed.Talyah
B
7

A solution:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

Testtime:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

Pingtime:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms
Bedevil answered 14/4, 2010 at 2:46 Comment(1)
Using Twisted as a threadpool is ignoring most of the benefits you can get from it. You should be using the async HTTP client instead.Algo
C
3
pip install requests-threads

Example Usage using async/await — send 100 concurrent requests

from requests_threads import AsyncSession

session = AsyncSession(n=100)

async def _main():
    rs = []
    for _ in range(100):
        rs.append(await session.get('http://httpbin.org/get'))
    print(rs)

if __name__ == '__main__':
    session.run(_main)

This example works on Python 3 only. You can also provide your own asyncio event loop!

Example Usage using Twisted

from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import react
from requests_threads import AsyncSession

session = AsyncSession(n=100)

@inlineCallbacks
def main(reactor):
    responses = []
    for i in range(100):
        responses.append(session.get('http://httpbin.org/get'))

    for response in responses:
        r = yield response
        print(r)

if __name__ == '__main__':
    react(main)

This example works on both Python 2 and Python 3.

Maybe it can be helpful my repo, one basic example, WRITING FAST ASYNC HTTP REQUESTS IN PYTHON

Chace answered 23/11, 2021 at 19:17 Comment(0)
O
3

Here's an "async" solution that doesn't use asyncio, but the lower-level mechanism asyncio uses (on Linux): select(). (Or maybe asyncio uses poll, or epoll, but it's a similar principle.)

It's a slightly modified version of the example from PyCurl.

(For simplicity it requests the same URL multiple times, but you can easily modify it to retrieve a bunch of distinct URLs.)

(Another slight modification can make this retrieve the same URL over and over as an infinite loop. Hint: change while urls and handles to while handles, and change while nprocessed<nurls to while 1.)

import pycurl,io,gzip,signal, time, random
signal.signal(signal.SIGPIPE, signal.SIG_IGN)  # NOTE! We should ignore SIGPIPE when using pycurl.NOSIGNAL - see the libcurl tutorial for more info

NCONNS = 2  # Number of concurrent GET requests
url    = 'example.com'
urls   = [url for i in range(0x7*NCONNS)]  # Copy the same URL over and over

# Check args
nurls  = len(urls)
NCONNS = min(NCONNS, nurls)
print("\x1b[32m%s \x1b[0m(compiled against 0x%x)" % (pycurl.version, pycurl.COMPILE_LIBCURL_VERSION_NUM))
print(f'\x1b[37m{nurls} \x1b[91m@ \x1b[92m{NCONNS}\x1b[0m')

# Pre-allocate a list of curl objects
m         = pycurl.CurlMulti()
m.handles = []
for i in range(NCONNS):
  c = pycurl.Curl()
  c.setopt(pycurl.FOLLOWLOCATION,  1)
  c.setopt(pycurl.MAXREDIRS,       5)
  c.setopt(pycurl.CONNECTTIMEOUT,  30)
  c.setopt(pycurl.TIMEOUT,         300)
  c.setopt(pycurl.NOSIGNAL,        1)
  m.handles.append(c)

handles    = m.handles  # MUST make a copy?!
nprocessed = 0
while nprocessed<nurls:

  while urls and handles:  # If there is an url to process and a free curl object, add to multi stack
    url   = urls.pop(0)
    c     = handles.pop()
    c.buf = io.BytesIO()
    c.url = url  # store some info
    c.t0  = time.perf_counter()
    c.setopt(pycurl.URL,        c.url)
    c.setopt(pycurl.WRITEDATA,  c.buf)
    c.setopt(pycurl.HTTPHEADER, [f'user-agent: {random.randint(0,(1<<256)-1):x}', 'accept-encoding: gzip, deflate', 'connection: keep-alive', 'keep-alive: timeout=10, max=1000'])
    m.add_handle(c)

  while 1:  # Run the internal curl state machine for the multi stack
    ret, num_handles = m.perform()
    if ret!=pycurl.E_CALL_MULTI_PERFORM:  break

  while 1:  # Check for curl objects which have terminated, and add them to the handles
    nq, ok_list, ko_list = m.info_read()
    for c in ok_list:
      m.remove_handle(c)
      t1 = time.perf_counter()
      reply = gzip.decompress(c.buf.getvalue())
      print(f'\x1b[33mGET  \x1b[32m{t1-c.t0:.3f}  \x1b[37m{len(reply):9,}  \x1b[0m{reply[:32]}...')  # \x1b[35m{psutil.Process(os.getpid()).memory_info().rss:,} \x1b[0mbytes')
      handles.append(c)
    for c, errno, errmsg in ko_list:
      m.remove_handle(c)
      print('\x1b[31mFAIL {c.url} {errno} {errmsg}')
      handles.append(c)
    nprocessed = nprocessed + len(ok_list) + len(ko_list)
    if nq==0: break

  m.select(1.0)  # Currently no more I/O is pending, could do something in the meantime (display a progress bar, etc.). We just call select() to sleep until some more data is available.

for c in m.handles:
  c.close()
m.close()
Overturf answered 27/12, 2021 at 3:50 Comment(2)
Why is asyncio slower than select?Bowrah
I don't think I said it's slower, but, now that you mention it, this is Python after all, so asyncio could have overhead when compared to raw calls to select (which is why I'm favorable to the idea of not using asyncio but using select directly if it's easy or if you have the time or if you're having fun).Postpositive
C
2

Using a thread pool is a good option, and will make this fairly easy. Unfortunately, python doesn't have a standard library that makes thread pools ultra easy. But here is a decent library that should get you started: http://www.chrisarndt.de/projects/threadpool/

Code example from their site:

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

Hope this helps.

Clique answered 13/4, 2010 at 19:42 Comment(3)
I suggest that you specify q_size for ThreadPool like this: ThreadPool(poolsize, q_size=1000) So that you won't have 100000 WorkRequest objects in memory. "If q_size>0 the size of the work request queue is limited and the thread pool blocks when the queue is full and it tries to put more work requests in it (see putRequest method), unless you also use a positive timeout value for putRequest."Enure
So far I'm trying to implement the threadpool solution - as suggested. However, I don't understand the parameter list in the makeRequests function. What is some_callable, list_of_args, callback? Perhaps if I saw a real code snippet that would help. I'm surprised that the author of that library didn't post ANY examples.Bowrah
some_callable is your function that all your work is done in (connecting to the http server). list_of_args is arguments that will be passed into some_callabe. callback is a function that will be called when the worker thread is done. It takes two arguments, the worker object (don't need to concern your self with this really), and the results that the worker retrieved.Clique
A
2

For your case, threading will probably do the trick as you'll probably be spending most time waiting for a response. There are helpful modules like Queue in the standard library that might help.

I did a similar thing with parallel downloading of files before and it was good enough for me, but it wasn't on the scale you are talking about.

If your task was more CPU-bound, you might want to look at the multiprocessing module, which will allow you to utilize more CPUs/cores/threads (more processes that won't block each other since the locking is per process)

Allie answered 13/4, 2010 at 19:43 Comment(1)
The only thing I'd like to mention is that spawning multiple processes may be more expensive than spawning multiple threads. Also, there is no clear performance gain in sending out 100,000 HTTP requests with multiple processes vs. multiple threads.Bowrah
A
2

This twisted async web client goes pretty fast.

#!/usr/bin/python2.7

from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput

pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}

def getLock(url, simultaneous = 1):
    return locks[urlparse(url).netloc, randrange(simultaneous)]

@inlineCallbacks
def getMapping(url):
    # Limit ourselves to 4 simultaneous connections per host
    # Tweak this number, but it should be no larger than pool.maxPersistentPerHost 
    lock = getLock(url,4)
    yield lock.acquire()
    try:
        resp = yield agent.request('HEAD', url)
        codes[url] = resp.code
    except Exception as e:
        codes[url] = str(e)
    finally:
        lock.release()


dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())

reactor.run()
pprint(codes)
Alexander answered 12/8, 2014 at 3:0 Comment(0)
D
2

Create epoll object,
open many client TCP sockets,
adjust their send buffers to be a bit more than request header,
send a request header — it should be immediate, just placing into a buffer, register socket in epoll object,
do .poll on epoll obect,
read first 3 bytes from each socket from .poll,
write them to sys.stdout followed by \n (don't flush), close the client socket.

Limit number of sockets opened simultaneously — handle errors when sockets are created. Create a new socket only if another is closed.
Adjust OS limits.
Try forking into a few (not many) processes: this may help to use CPU a bit more effectively.

Deluna answered 16/7, 2019 at 13:46 Comment(1)
@Bowrah Must be. I'd be surprised otherwise. But it certainly needs experimentation.Deluna
B
2

I found that using the tornado package to be the fastest and simplest way to achieve this:

from tornado import ioloop, httpclient, gen


def main(urls):
    """
    Asynchronously download the HTML contents of a list of URLs.
    :param urls: A list of URLs to download.
    :return: List of response objects, one for each URL.
    """

    @gen.coroutine
    def fetch_and_handle():
        httpclient.AsyncHTTPClient.configure(None, defaults=dict(user_agent='MyUserAgent'))
        http_client = httpclient.AsyncHTTPClient()
        waiter = gen.WaitIterator(*[http_client.fetch(url, raise_error=False, method='HEAD')
                                    for url in urls])
        results = []
        # Wait for the jobs to complete
        while not waiter.done():
            try:
                response = yield waiter.next()
            except httpclient.HTTPError as e:
                print(f'Non-200 HTTP response returned: {e}')
                continue
            except Exception as e:
                print(f'An unexpected error occurred querying: {e}')
                continue
            else:
                print(f'URL \'{response.request.url}\' has status code <{response.code}>')
                results.append(response)
        return results

    loop = ioloop.IOLoop.current()
    web_pages = loop.run_sync(fetch_and_handle)

    return web_pages

my_urls = ['url1.com', 'url2.com', 'url100000.com']
responses = main(my_urls)
print(responses[0])
Becoming answered 19/6, 2020 at 8:1 Comment(1)
My god finally thanks alot, I was stuck using shitty concurrent futures, and for some reason in my env it keeps getting stuck in a infinite loop on some url calls and I'm 100% sure I'm using it correctly. This solution with tornado is top notchFilter
L
2

Scrapy framework will solve your problem fast and professionally. It will also cache all the requests, so that you can rerun the failed ones only later on.

Save this script as quotes_spider.py.

# quote_spiders.py
import json
import string
import scrapy
from scrapy.crawler import CrawlerProcess
from scrapy.item import Item, Field

class TextCleaningPipeline(object):
    def _clean_text(self, text):
        text = text.replace('“', '').replace('”', '')
        table = str.maketrans({key: None for key in string.punctuation})
        clean_text = text.translate(table)
        return clean_text.lower()

    def process_item(self, item, spider):
        item['text'] = self._clean_text(item['text'])
        return item

class JsonWriterPipeline(object):
    def open_spider(self, spider):
        self.file = open(spider.settings['JSON_FILE'], 'a')

    def close_spider(self, spider):
        self.file.close()

    def process_item(self, item, spider):
        line = json.dumps(dict(item)) + "\n"
        self.file.write(line)
        return item

class QuoteItem(Item):
    text = Field()
    author = Field()
    tags = Field()
    spider = Field()

class QuoteSpider(scrapy.Spider):
    name = "quotes"

    def start_requests(self):
        urls = [
            'http://quotes.toscrape.com/page/1/',
            'http://quotes.toscrape.com/page/2/',
            # ...
        ]
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)

    def parse(self, response):
        for quote in response.css('div.quote'):
            item = QuoteItem()
            item['text'] = quote.css('span.text::text').get()
            item['author'] = quote.css('small.author::text').get()
            item['tags'] = quote.css('div.tags a.tag::text').getall()
            item['spider'] = self.name
            yield item

if __name__ == '__main__':
    settings = dict()
    settings['USER_AGENT'] = 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)'
    settings['HTTPCACHE_ENABLED'] = True
    settings['CONCURRENT_REQUESTS'] = 20
    settings['CONCURRENT_REQUESTS_PER_DOMAIN'] = 20
    settings['JSON_FILE'] = 'items.jl'
    settings['ITEM_PIPELINES'] = dict()
    settings['ITEM_PIPELINES']['__main__.TextCleaningPipeline'] = 800
    settings['ITEM_PIPELINES']['__main__.JsonWriterPipeline'] = 801

    process = CrawlerProcess(settings=settings)
    process.crawl(QuoteSpider)
    process.start()

followed by

$ pip install Scrapy
$ python quote_spiders.py 

To fine tune the scraper adjust the CONCURRENT_REQUESTS and CONCURRENT_REQUESTS_PER_DOMAIN settings accordingly.

Lagomorph answered 15/6, 2022 at 5:6 Comment(2)
What makes Scrapy faster?Bowrah
Scrapy is based on battle tested twisted framework, so can be as fast as asycio. Also has great docs and vibrant community around it.Lagomorph
K
0

Consider using Windmill , although Windmill probably cant do that many threads.

You could do it with a hand rolled Python script on 5 machines, each one connecting outbound using ports 40000-60000, opening 100,000 port connections.

Also, it might help to do a sample test with a nicely threaded QA app such as OpenSTA in order to get an idea of how much each server can handle.

Also, try looking into just using simple Perl with the LWP::ConnCache class. You'll probably get more performance (more connections) that way.

Kohima answered 13/4, 2010 at 20:20 Comment(0)
I
0

[Tool]

Apache Bench is all you need. - A command line computer program (CLI) for measuring the performance of HTTP web servers

A nice blog post for you: https://www.petefreitag.com/item/689.cfm (from Pete Freitag)

Ineffable answered 18/2, 2021 at 9:32 Comment(1)
The OP was not about measuring one server. It was to send many requests concurrently to many servers, to collect the responses. Kind of like web crawling.Bowrah
C
-1

The easiest way would be to use Python's built-in threading library. They're not "real" / kernel threads They have issues (like serialization), but are good enough. You'd want a queue & thread pool. One option is here, but it's trivial to write your own. You can't parallelize all 100,000 calls, but you can fire off 100 (or so) of them at the same time.

Corrode answered 13/4, 2010 at 19:30 Comment(5)
Python's threads are quite real, as opposed to Ruby's for instance. Under the hood they are implemented as native OS threads, at least on Unix/Linux and Windows. Maybe you're referring to the GIL, but it doesn't make the threads less real...Holland
Eli is right about Python's threads, but Pestilence's point that you'd want to use a thread pool is correct, too. The last thing that you'd want to do in this case is try to start a separate thread for each of the 100K requests simultaneously.Diaconicum
Igor, you can't sensibly post code snippets in comments, but you can edit your question and add them there.Diaconicum
Pestilence: how many queues and threads-per-queue would you recommend for my solution?Bowrah
plus this is an I/O bound task not CPU bound, the GIL largely affects CPU bound tasksWacke

© 2022 - 2024 — McMap. All rights reserved.