python asyncio & httpx
Asked Answered
A

3

3

I am very new to asynchronous programming and I was playing around with httpx. I have the following code and I am sure I am doing something wrong - just don't know what it is. There are two methods, one synchronous and other asynchronous. They are both pull from google finance. On my system I am seeing the time spent as following:

Asynchronous: 5.015218734741211
Synchronous: 5.173618316650391

Here is the code:


import httpx
import asyncio
import time



#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
def sync_pull(url):
  r = httpx.get(url)
  print(r.status_code)


#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
async def async_pull(url):
  async with httpx.AsyncClient() as client:
    r = await client.get(url)
    print(r.status_code)


#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
if __name__ == "__main__":

  goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
  tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL', 
             'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
             ]  

  print("Running asynchronously...")
  async_start = time.time()
  for ticker in tickers:
    url = goog_fin_nyse_url + ticker + ':NYSE'
    asyncio.run(async_pull(url))
  async_end = time.time()
  print(f"Time lapsed is: {async_end - async_start}")


  print("Running synchronously...")
  sync_start = time.time()
  for ticker in tickers:
    url = goog_fin_nyse_url + ticker + ':NYSE'
    sync_pull(url)
  sync_end = time.time()
  print(f"Time lapsed is: {sync_end - sync_start}")

I had hoped the asynchronous method approach would require a fraction of the time the synchronous approach is requiring. What am I doing wrong?

Annora answered 26/5, 2021 at 22:5 Comment(0)
R
6

When you say asyncio.run(async_pull) you're saying run 'async_pull' and wait for the result to come back. Since you do this once per each ticker in your loop, you're essentially using asyncio to run things synchronously and won't see performance benefits.

What you need to do is create several async calls and run them concurrently. There are several ways to do this, the easiest is to use asyncio.gather (see https://docs.python.org/3/library/asyncio-task.html#asyncio.gather) which takes in a sequence of coroutines and runs them concurrently. Adapting your code is fairly straightforward, you create an async function to take a list of urls and then call async_pull on each of them and then pass that in to asyncio.gather and await the results. Adapting your code to this looks like the following:

import httpx
import asyncio
import time

def sync_pull(url):
    r = httpx.get(url)
    print(r.status_code)

async def async_pull(url):
    async with httpx.AsyncClient() as client:
        r = await client.get(url)
        print(r.status_code)


async def async_pull_all(urls):
    return await asyncio.gather(*[async_pull(url) for url in urls])

if __name__ == "__main__":

    goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
    tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL',
           'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
           ]

    print("Running asynchronously...")
    async_start = time.time()
    results = asyncio.run(async_pull_all([goog_fin_nyse_url + ticker + ':NYSE' for ticker in tickers]))
    async_end = time.time()
    print(f"Time lapsed is: {async_end - async_start}")


    print("Running synchronously...")
    sync_start = time.time()
    for ticker in tickers:
        url = goog_fin_nyse_url + ticker + ':NYSE'
        sync_pull(url)
    sync_end = time.time()
    print(f"Time lapsed is: {sync_end - sync_start}")

Running this way, the asynchronous version runs in about a second for me as opposed to seven synchronously.

Rhetorician answered 26/5, 2021 at 22:41 Comment(2)
Thank you Matt for the explanation and fix!Annora
This is the actual only answer to the OP/'s question. The other "answers" are just alternatives solutions but don't answer the question, with the issue being: making individual async calls synchronously.Maelstrom
C
2

Here's a nice pattern I use (I tend to change it a little each time). In general, I make a module async_utils.py and just import the top-level fetching function (e.g. here fetch_things), and then my code is free to forget about the internals (other than error handling). You can do it in other ways, but I like the 'functional' style of aiostream, and often find the repeated calls to the process function take certain defaults I set using functools.partial.

You can pass in a tqdm.tqdm progress bar to pbar (initialised with known size total=len(things)) to have it update when each async response is processed.

import asyncio
import httpx
from aiostream import stream
from functools import partial

__all__ = ["fetch", "process", "async_fetch_urlset", "fetch_things"]

async def fetch(session, url, raise_for_status=False):
    response = await session.get(str(url))
    if raise_for_status:
        response.raise_for_status()
    return response


async def process_thing(data, things, pbar=None, verbose=False):
    # Map the response back to the thing it came from in the things list
    source_url = data.history[0].url if data.history else data.url
    thing = next(t for t in things if source_url == t.get("thing_url"))
    # Handle `data.content` here, where `data` is the `httpx.Response`
    if verbose:
        print(f"Processing {source_url=}")
    build.update({"computed_value": "result goes here"})
    if pbar:
        pbar.update()


async def async_fetch_urlset(urls, things, pbar=None, verbose=False, timeout_s=10.0):
    timeout = httpx.Timeout(timeout=timeout_s)
    async with httpx.AsyncClient(timeout=timeout) as session:
        ws = stream.repeat(session)
        xs = stream.zip(ws, stream.iterate(urls))
        ys = stream.starmap(xs, fetch, ordered=False, task_limit=20)
        process = partial(process_thing, things=things, pbar=pbar, verbose=verbose)
        zs = stream.map(ys, process)
        return await zs

def fetch_things(urls, things, pbar=None, verbose=False):
    return asyncio.run(async_fetch_urlset(urls, things, pbar, verbose))

In this example, the input is a list of dicts (with string keys and values), things: list[dict[str,str]], and the key "thing_url" is accessed to retrieve the URL. Having a dict or object is desirable instead of just the URL string for when you want to 'map' the result back to the object it came from. The process_thing function is able to modify the input list things in-place (i.e. any changes are not scoped within the function, they change it back in the scope that called it).

You'll often find errors arise during async runs that you don't get when running synchronously, so you'll need to catch them, and re-try. A common gotcha is to retry at the wrong level (e.g. around the entire loop)

In particular, you'll want to import and catch httpcore.ConnectTimeout, httpx.ConnectTimeout, httpx.RemoteProtocolError, and httpx.ReadTimeout.

Increasing the timeout_s parameter will reduce the frequency of the timeout errors by letting the AsyncClient 'wait' for longer, but doing so may in fact slow down your program (it won't "fail fast" quite as fast).

Here's an example of how to use the async_utils module given above:

from async_utils import fetch_things
import httpx
import httpcore

# UNCOMMENT THIS TO SEE ALL THE HTTPX INTERNAL LOGGING
#import logging
#log = logging.getLogger()
#log.setLevel(logging.DEBUG)
#log_format = logging.Formatter('[%(asctime)s] [%(levelname)s] - %(message)s')
#console = logging.StreamHandler()
#console.setLevel(logging.DEBUG)
#console.setFormatter(log_format)
#log.addHandler(console)

things = [
    {"url": "https://python.org", "name": "Python"},
    {"url": "https://www.python-httpx.org/", "name": "HTTPX"},
]
#log.debug("URLSET:" + str(list(t.get("url") for t in things)))

def make_urlset(things):
    """Make a URL generator (empty if all have been fetched)"""
    urlset = (t.get("url") for t in things if "computed_value" not in t)
    return urlset

retryable_errors = (
    httpcore.ConnectTimeout,
    httpx.ConnectTimeout, httpx.RemoteProtocolError, httpx.ReadTimeout,
)

# ASYNCHRONOUS:
max_retries = 100
for i in range(max_retries):
    print(f"Retry {i}")
    try:
        urlset = make_urlset(things)
        foo = fetch_things(urls=urlset, things=things, verbose=True)
    except retryable_errors as exc:
        print(f"Caught {exc!r}")
        if i == max_retries - 1:
            raise
    except Exception:
        raise

# SYNCHRONOUS:
#for t in things:
#    resp = httpx.get(t["url"])

In this example I set a key "computed_value" on a dictionary once the async response has successfully been processed which then prevents that URL from being entered into the generator on the next round (when make_urlset is called again). In this way, the generator gets progressively smaller. You can also do it with lists but I find a generator of the URLs to be pulled works reliably. For an object you'd change the dictionary key assignment/access (update/in) to attribute assignment/access (settatr/hasattr).

Codee answered 9/8, 2021 at 16:10 Comment(0)
A
1

I wanted to post working version of the coding using futures - virtually the same run-time:


import httpx
import asyncio
import time

#
#--------------------------------------------------------------------
# Synchronous pull
#--------------------------------------------------------------------
#
def sync_pull(url):
  r = httpx.get(url)
  print(r.status_code)

#
#--------------------------------------------------------------------
# Asynchronous Pull
#--------------------------------------------------------------------
#
async def async_pull(url):
  async with httpx.AsyncClient() as client:
    r = await client.get(url)
    print(r.status_code)

#
#--------------------------------------------------------------------
# Build tasks queue & execute coroutines
#--------------------------------------------------------------------
#
async def build_task() -> None:
  goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
  tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL', 
             'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
             ]  
  tasks= []

  #
  ## Following block of code will create a queue full of function 
  ## call
  for ticker in tickers:
    url = goog_fin_nyse_url + ticker + ':NYSE'
    tasks.append(asyncio.ensure_future(async_pull(url)))

  start_time = time.time()

  #
  ## This block of code will derefernce the function calls
  ## from the queue, which will cause them all to run
  ## rapidly
  await asyncio.gather(*tasks)

  #
  ## Calculate time lapsed
  finish_time = time.time()
  elapsed_time = finish_time - start_time
  print(f"\n Time spent processing: {elapsed_time} ")


# Start from here
if __name__ == "__main__":
  asyncio.run(build_task())

Annora answered 27/5, 2021 at 20:39 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.