How to execute two "aggregate" functions (like sum) concurrently, feeding them from the same iterator?
Asked Answered
M

3

32

Imagine we have an iterator, say iter(range(1, 1000)). And we have two functions, each accepting an iterator as the only parameter, say sum() and max(). In SQL world we would call them aggregate functions.

Is there any way to obtain results of both without buffering the iterator output?

To do it, we would need to pause and resume aggregate function execution, in order to feed them both with the same values without storing them. Maybe is there a way to express it using async things without sleeps?

Mundane answered 26/4, 2018 at 9:16 Comment(6)
sum(a,b,c) is the same as sum(a,sum(b,c)), likewise for max. Can we assume that that's always the case? Then just apply the aggregator functions for each element in the iterator.Prothrombin
@Prothrombin Nice catch! I can't speak for the OP, but assuming that does sound like a bit of a stretch because then you're really working with binary functions (+ and the binary max), and not with aggregate functions. The question refers to aggregate functions in general, describing them as "accepting an iterator as the only parameter", only using sum and max as examples. In that context I would argue that an answer needs to work for aggregates that cannot be reduced to a stateless series of applications of a binary function (e.g. an aggregate that returns the median of the sequence).Outrush
@Outrush I thought it would be a nice and simple way that works with O(1) memory and without threads, but you are right; average would be another example. (Also, it's pretty slow.)Prothrombin
@Prothrombin Also, it's pretty slow. I tried it out of curiosity, and for sum and max and range(10000) it clocks at 4.9 ms on my machine, way faster than solutions from my answer (except the initial ones that buffer everything).Outrush
@Outrush I just compared it to buffering the entire iterator with list or tee. Anyway, I posted it as an answer, maybe it's useful in some cases. At least it's simpler than the thread-based approaches.Prothrombin
@Prothrombin At least it's simpler than the thread-based approaches For the record, the approaches in my answer are not thread-based, at least not all of them. They're still complex, though, but that's probably unavoidable without loss of generality.Outrush
O
48

Let's consider how to apply two aggregate functions to the same iterator, which we can only exhaust once. The initial attempt (which hardcodes sum and max for brevity, but is trivially generalizable to an arbitrary number of aggregate functions) might look like this:

def max_and_sum_buffer(it):
    content = list(it)
    p = sum(content)
    m = max(content)
    return p, m

This implementation has the downside that it stores all the generated elements in memory at once, despite both functions being perfectly capable of stream processing. The question anticipates this cop-out and explicitly requests the result to be produced without buffering the iterator output. Is it possible to do this?

Serial execution: itertools.tee

It certainly seems possible. After all, Python iterators are external, so every iterator is already capable of suspending itself. How hard can it be to provide an adapter that splits an iterator into two new iterators that provide the same content? Indeed, this is exactly the description of itertools.tee, which appears perfectly suited to parallel iteration:

def max_and_sum_tee(it):
    it1, it2 = itertools.tee(it)
    p = sum(it1)  # XXX
    m = max(it2)
    return p, m

The above produces the correct result, but doesn't work the way we'd like it to. The trouble is that we're not iterating in parallel. Aggregate functions like sum and max never suspend - each insists on consuming all of the iterator content before producing the result. So sum will exhaust it1 before max has had a chance to run at all. Exhausting elements of it1 while leaving it2 alone will cause those elements to be accumulated inside an internal FIFO shared between the two iterators. That's unavoidable here - since max(it2) must see the same elements, tee has no choice but to accumulate them. (For more interesting details on tee, refer to this post.)

In other words, there is no difference between this implementation and the first one, except that the first one at least makes the buffering explicit. To eliminate buffering, sum and max must run in parallel, not one after the other.

Threads: concurrent.futures

Let's see what happens if we run the aggregate functions in separate threads, still using tee to duplicate the original iterator:

def max_and_sum_threads_simple(it):
    it1, it2 = itertools.tee(it)

    with concurrent.futures.ThreadPoolExecutor(2) as executor:
        sum_future = executor.submit(lambda: sum(it1))
        max_future = executor.submit(lambda: max(it2))

    return sum_future.result(), max_future.result()

Now sum and max actually run in parallel (as much as the GIL permits), threads being managed by the excellent concurrent.futures module. It has a fatal flaw, however: for tee not to buffer the data, sum and max must process their items at exactly the same rate. If one is even a little bit faster than the other, they will drift apart, and tee will buffer all intermediate elements. Since there is no way to predict how fast each will run, the amount of buffering is both unpredictable and has the nasty worst case of buffering everything.

To ensure that no buffering occurs, tee must be replaced with a custom generator that buffers nothing and blocks until all the consumers have observed the previous value before proceeding to the next one. As before, each consumer runs in its own thread, but now the calling thread is busy running a producer, a loop that actually iterates over the source iterator and signals that a new value is available. Here is an implementation:

def max_and_sum_threads(it):
    STOP = object()
    next_val = None
    consumed = threading.Barrier(2 + 1)  # 2 consumers + 1 producer
    val_id = 0
    got_val = threading.Condition()

    def send(val):
        nonlocal next_val, val_id
        consumed.wait()
        with got_val:
            next_val = val
            val_id += 1
            got_val.notify_all()

    def produce():
        for elem in it:
            send(elem)
        send(STOP)

    def consume():
        last_val_id = -1
        while True:
            consumed.wait()
            with got_val:
                got_val.wait_for(lambda: val_id != last_val_id)
            if next_val is STOP:
                return
            yield next_val
            last_val_id = val_id

    with concurrent.futures.ThreadPoolExecutor(2) as executor:
        sum_future = executor.submit(lambda: sum(consume()))
        max_future = executor.submit(lambda: max(consume()))
        produce()

    return sum_future.result(), max_future.result()

This is quite some amount of code for something so simple conceptually, but it is necessary for correct operation.

produce() loops over the outside iterator and sends the items to the consumers, one value at a time. It uses a barrier, a convenient synchronization primitive added in Python 3.2, to wait until all consumers are done with the old value before overwriting it with the new one in next_val. Once the new value is actually ready, a condition is broadcast. consume() is a generator that transmits the produced values as they arrive, until detecting STOP. The code can be generalized run any number of aggregate functions in parallel by creating consumers in a loop, and adjusting their number when creating the barrier.

The downside of this implementation is that it requires creation of threads (possibly alleviated by making the thread pool global) and a lot of very careful synchronization at each iteration pass. This synchronization destroys performance - this version is almost 2000 times slower than the single-threaded tee, and 475 times slower than the simple but non-deterministic threaded version.

Still, as long as threads are used, there is no avoiding synchronization in some form. To completely eliminate synchronization, we must abandon threads and switch to cooperative multi-tasking. The question is is it possible to suspend execution of ordinary synchronous functions like sum and max in order to switch between them?

Fibers: greenlet

It turns out that the greenlet third-party extension module enables exactly that. Greenlets are an implementation of fibers, lightweight micro-threads that switch between each other explicitly. This is sort of like Python generators, which use yield to suspend, except greenlets offer a much more flexible suspension mechanism, allowing one to choose who to suspend to.

This makes it fairly easy to port the threaded version of max_and_sum to greenlets:

def max_and_sum_greenlet(it):
    STOP = object()
    consumers = None

    def send(val):
        for g in consumers:
            g.switch(val)

    def produce():
        for elem in it:
            send(elem)
        send(STOP)

    def consume():
        g_produce = greenlet.getcurrent().parent
        while True:
            val = g_produce.switch()
            if val is STOP:
                return
            yield val

    sum_result = []
    max_result = []
    gsum = greenlet.greenlet(lambda: sum_result.append(sum(consume())))
    gsum.switch()
    gmax = greenlet.greenlet(lambda: max_result.append(max(consume())))
    gmax.switch()
    consumers = (gsum, gmax)
    produce()

    return sum_result[0], max_result[0]

The logic is the same, but with less code. As before, produce produces values retrieved from the source iterator, but its send doesn't bother with synchronization, as it doesn't need to when everything is single-threaded. Instead, it explicitly switches to every consumer in turn to do its thing, with the consumer dutifully switching right back. After going through all consumers, the producer is ready for the next iteration pass.

Results are retrieved using an intermediate single-element list because greenlet doesn't provide access to the return value of the target function (and neither does threading.Thread, which is why we opted for concurrent.futures above).

There are downsides to using greenlets, though. First, they don't come with the standard library, you need to install the greenlet extension. Then, greenlet is inherently non-portable because the stack-switching code is not supported by the OS and the compiler and can be considered somewhat of a hack (although an extremely clever one). A Python targeting WebAssembly or JVM or GraalVM would be very unlikely to support greenlet. This is not a pressing issue, but it's definitely something to keep in mind for the long haul.

Coroutines: asyncio

As of Python 3.5, Python provides native coroutines. Unlike greenlets, and similar to generators, coroutines are distinct from regular functions and must be defined using async def. Coroutines can't be easily executed from synchronous code, they must instead be processed by a scheduler which drives them to completion. The scheduler is also known as an event loop because its other job is to receive IO events and pass them to appropriate callbacks and coroutines. In the standard library, this is the role of the asyncio module.

Before implementing an asyncio-based max_and_sum, we must first resolve a hurdle. Unlike greenlet, asyncio is only able to suspend execution of coroutines, not of arbitrary functions. So we need to replace sum and max with coroutines that do essentially the same thing. This is as simple as implementing them in the obvious way, only replacing for with async for, enabling the async iterator to suspend the coroutine while waiting for the next value to arrive:

async def asum(it):
    s = 0
    async for elem in it:
        s += elem
    return s

async def amax(it):
    NONE_YET = object()
    largest = NONE_YET
    async for elem in it:
        if largest is NONE_YET or elem > largest:
            largest = elem
    if largest is NONE_YET:
        raise ValueError("amax() arg is an empty sequence")
    return largest

# or, using https://github.com/vxgmichel/aiostream
#
#from aiostream.stream import accumulate
#def asum(it):
#    return accumulate(it, initializer=0)
#def amax(it):
#    return accumulate(it, max)

One could reasonably ask if providing a new pair of aggregate functions is cheating; after all, the previous solutions were careful to use existing sum and max built-ins. The answer will depend on the exact interpretation of the question, but I would argue that the new functions are allowed because they are in no way specific to the task at hand. They do the exact same thing the built-ins do, but consuming async iterators. I suspect that the only reason such functions don't already exist somewhere in the standard library is due to coroutines and async iterators being a relatively new feature.

With that out of the way, we can proceed to write max_and_sum as a coroutine:

async def max_and_sum_asyncio(it):
    loop = asyncio.get_event_loop()
    STOP = object()

    next_val = loop.create_future()
    consumed = loop.create_future()
    used_cnt = 2  # number of consumers

    async def produce():
        for elem in it:
            next_val.set_result(elem)
            await consumed
        next_val.set_result(STOP)

    async def consume():
        nonlocal next_val, consumed, used_cnt
        while True:
            val = await next_val
            if val is STOP:
                return
            yield val
            used_cnt -= 1
            if not used_cnt:
                consumed.set_result(None)
                consumed = loop.create_future()
                next_val = loop.create_future()
                used_cnt = 2
            else:
                await consumed

    s, m, _ = await asyncio.gather(asum(consume()), amax(consume()),
                                   produce())
    return s, m

Although this version is based on switching between coroutines inside a single thread, just like the one using greenlet, it looks different. asyncio doesn't provide explicit switching of coroutines, it bases task switching on the await suspension/resumption primitive. The target of await can be another coroutine, but also an abstract "future", a value placeholder which will be filled in later by some other coroutine. Once the awaited value becomes available, the event loop automatically resumes execution of the coroutine, with the await expression evaluating to the provided value. So instead of produce switching to consumers, it suspends itself by awaiting a future that will arrive once all the consumers have observed the produced value.

consume() is an asynchronous generator, which is like an ordinary generator, except it creates an async iterator, which our aggregate coroutines are already prepared to accept by using async for. An async iterator's equivalent of __next__ is called __anext__ and is a coroutine, allowing the coroutine that exhausts the async iterator to suspend while waiting for the new value to arrive. When a running async generator suspends on an await, that is observed by async for as a suspension of the implicit __anext__ invocation. consume() does exactly that when it waits for the values provided by produce and, as they become available, transmits them to aggregate coroutines like asum and amax. Waiting is realized using the next_val future, which carries the next element from it. Awaiting that future inside consume() suspends the async generator, and with it the aggregate coroutine.

The advantage of this approach compared to greenlets' explicit switching is that it makes it much easier to combine coroutines that don't know of each other into the same event loop. For example, one could have two instances of max_and_sum running in parallel (in the same thread), or run a more complex aggregate function that invoked further async code to do calculations.

The following convenience function shows how to run the above from non-asyncio code:

def max_and_sum_asyncio_sync(it):
    # trivially instantiate the coroutine and execute it in the
    # default event loop
    coro = max_and_sum_asyncio(it)
    return asyncio.get_event_loop().run_until_complete(coro)

Performance

Measuring and comparing performance of these approaches to parallel execution can be misleading because sum and max do almost no processing, which over-stresses the overhead of parallelization. Treat these as you would treat any microbenchmarks, with a large grain of salt. Having said that, let's look at the numbers anyway!

Measurements were produced using Python 3.6 The functions were run only once and given range(10000), their time measured by subtracting time.time() before and after the execution. Here are the results:

  • max_and_sum_buffer and max_and_sum_tee: 0.66 ms - almost exact same time for both, with the tee version being a bit faster.

  • max_and_sum_threads_simple: 2.7 ms. This timing means very little because of non-deterministic buffering, so this might be measuring the time to start two threads and the synchronization internally performed by Python.

  • max_and_sum_threads: 1.29 seconds, by far the slowest option, ~2000 times slower than the fastest one. This horrible result is likely caused by a combination of the multiple synchronizations performed at each step of the iteration and their interaction with the GIL.

  • max_and_sum_greenlet: 25.5 ms, slow compared to the initial version, but much faster than the threaded version. With a sufficiently complex aggregate function, one can imagine using this version in production.

  • max_and_sum_asyncio: 351 ms, almost 14 times slower than the greenlet version. This is a disappointing result because asyncio coroutines are more lightweight than greenlets, and switching between them should be much faster than switching between fibers. It is likely that the overhead of running the coroutine scheduler and the event loop (which in this case is overkill given that the code does no IO) is destroying the performance on this micro-benchmark.

  • max_and_sum_asyncio using uvloop: 125 ms. This is more than twice the speed of regular asyncio, but still almost 5x slower than greenlet.

Running the examples under PyPy doesn't bring significant speedup, in fact most of the examples run slightly slower, even after running them several times to ensure JIT warmup. The asyncio function requires a rewrite not to use async generators (since PyPy as of this writing implements Python 3.5), and executes in somewhat under 100ms. This is comparable to CPython+uvloop performance, i.e. better, but not dramatic compared to greenlet.

Outrush answered 28/7, 2018 at 9:7 Comment(2)
An excellent answer, but why rolling back the edit to use the API of concurrent futures executor correctly?Guava
@Guava Thanks for the suggestion, but both versions are in fact correct. In this case a lambda was used intentionally to make the code consistent with the later version that changes lambda: sum(it1) to lambda: sum(consume()), and where the transformation to the positional argument wouldn't work.Outrush
P
5

If it holds for your aggregate functions that f(a,b,c,...) == f(a, f(b, f(c, ...))),then you could just cycle through your functions and feed them one element at a time, each time combining them with the result of the previous application, like reduce would do, e.g. like this:

def aggregate(iterator, *functions):
    first = next(iterator)
    result = [first] * len(functions)
    for item in iterator:
        for i, f in enumerate(functions):
            result[i] = f((result[i], item))
    return result

This is considerably slower (about 10-20 times) than just materializing the iterator in a list and applying the aggregate function on the list as a whole, or using itertools.tee (which basically does the same thing, internally), but it has the benefit of using no additional memory.

Note, however, that while this works well for functions like sum, min or max, it does not work for other aggregating functions, e.g. finding the mean or median element of an iterator, as mean(a, b, c) != mean(a, mean(b, c)). (For mean, you could of course just get the sum and divide it by the number of elements, but computing e.g. the median taking just one element at a time will be more challenging.)

Prothrombin answered 31/7, 2018 at 21:31 Comment(0)
B
0

I was trying to solve the same problem and understand asyncio a little better and I came up with an answer that I believe is the same as user4815162342, but uses an asyncio.queue per consumer instead of creating two futures per iteration. Of course, it still has the downside of needing to re-write all your aggregation functions as async def coroutines.

import asyncio


# Provided an iterable and an integer, return N iterables that will each yield
# the values of the input iterable. All iterables must be iterated
# simultaneously.
def tee_async(it, N):
    # larger queues would provide less context switching, but use more memory.
    qs = [asyncio.Queue(1) for _ in range(N)]
    # value to send to the queue to signal end of iteration
    STOP = object()

    # we have a sync iterator, we need an async iterator
    # use a queue to pass values from the sync iterator to here
    # implement an async iterator here to pass the values to the consumer coroutine
    # bug warning: this hangs if one of the consumers does not finish iterating
    async def QueueIterator(q: asyncio.Queue):
        while True:
            i = await q.get()
            print(f"got {i}")
            if i is STOP:
                break
            yield i

    # A coroutine to consume the input iterator
    async def IteratorConsumer():
        # note: the iterator might advance to the next value before the
        # consumers have started with the previous value. IE the first aggregator
        # will work on it[0] AFTER we have loaded it[1] here.
        for i in it:
            for j, q in enumerate(qs):
                print(f"putting {i} in {j}")
                await q.put(i)
        for j, q in enumerate(qs):
            print(f"putting END_ITER in {j}")
            await q.put(STOP)

    asyncio.create_task(IteratorConsumer())
    return [QueueIterator(q) for q in qs]


# Provided an iterable and a list of aggregation coroutines, send each iterated
# value to each aggregation coroutine and return a list of the return values.
async def aggregate(it, aggregators):
    tees = tee_async(it, len(aggregators))
    tasks = []

    # if using recent python, try: async with asyncio.TaskGroup() as tg:
    tasks = [asyncio.create_task(a(t)) for a, t in zip(aggregators, tees)]

    return [await t for t in tasks]


# Provided an iterable and a list of aggregation coroutines, send each iterated
# value to each aggregation coroutine and return a list of the return values.
def aggregate_without_storing_everything(it, aggregators):
    return asyncio.run(aggregate(it, aggregators))


async def amax(it):
    max = None
    async for i in it:
        if max is None or i > max:
            max = i
    return max


async def acount(it):
    c = 0
    async for i in it:
        c += 1
    return c


def main():
    print(aggregate_without_storing_everything(range(3), [amax, acount]))


if __name__ == "__main__":
    main()

I benchmarked this against the max_and_sum_asyncio_sync function, the queue based approach here is significantly slower until the queue size is greater than 5. However both are incredibly slow, the point of doing this is that the iteration would not fit in memory, but in my tests an iteration that would not fit in memory would take days to run. This is not a good approach in practice.

Bernardina answered 10/9, 2023 at 9:4 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.