Asyncio imap fetch mails python3
Asked Answered
E

2

8

I'm testing with the asyncio module, however I need a hint / suggesstion how to fetch large emails in an async way.

I have a list with usernames and passwords for the mail accounts.

data = [
    {'usern': '[email protected]', 'passw': 'x'},
    {'usern': '[email protected]', 'passw': 'y'},
    {'usern': '[email protected]', 'passw': 'z'} (...)
]

I thought about:

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([get_attachment(d) for d in data]))
loop.close()

However, the long part is to download the email attachments.

Email:

@asyncio.coroutine
def get_attachment(d):
    username = d['usern']
    password = d['passw']

    connection = imaplib.IMAP4_SSL('imap.bar.de')
    connection.login(username, password)
    connection.select()

    # list all available mails
    typ, data = connection.search(None, 'ALL')

    for num in data[0].split():
        # fetching each mail
        typ, data = connection.fetch(num, '(RFC822)')
        raw_string = data[0][1].decode('utf-8')
        msg = email.message_from_string(raw_string)
        for part in msg.walk():
            if part.get_content_maintype() == 'multipart':
                continue

            if part.get('Content-Disposition') is None:
                continue

            if part.get_filename():
                body = part.get_payload(decode=True)
                # do something with the body, async?

    connection.close()
    connection.logout()

How could I process all (downloading attachments) mails in an async way?

Equiponderate answered 30/7, 2014 at 13:32 Comment(1)
Imaplib is a synchronous library, you can fake it with threads and the like, but it will never be truly asynchronous.Federica
S
7

If you don't have an asynchronous I/O-based imap library, you can just use a concurrent.futures.ThreadPoolExecutor to do the I/O in threads. Python will release the GIL during the I/O, so you'll get true concurrency:

def init_connection(d):    
    username = d['usern']
    password = d['passw']

    connection = imaplib.IMAP4_SSL('imap.bar.de')
    connection.login(username, password)
    connection.select()
    return connection

local = threading.local() # We use this to get a different connection per thread
def do_fetch(num, d, rfc):
    try:
        connection = local.connection
    except AttributeError:
        connnection = local.connection = init_connection(d)
    return connnection.fetch(num, rfc)

@asyncio.coroutine
def get_attachment(d, pool):
    connection = init_connection(d)    
    # list all available mails
    typ, data = connection.search(None, 'ALL')

    # Kick off asynchronous tasks for all the fetches
    loop = asyncio.get_event_loop()
    futs = [asyncio.create_task(loop.run_in_executor(pool, do_fetch, num, d, '(RFC822)'))
                for num in data[0].split()]

    # Process each fetch as it completes
    for fut in asyncio.as_completed(futs):
        typ, data = yield from fut
        raw_string = data[0][1].decode('utf-8')
        msg = email.message_from_string(raw_string)
        for part in msg.walk():
            if part.get_content_maintype() == 'multipart':
                continue

            if part.get('Content-Disposition') is None:
                continue

            if part.get_filename():
                body = part.get_payload(decode=True)
                # do something with the body, async?

    connection.close()
    connection.logout()    


loop = asyncio.get_event_loop()
pool = ThreadPoolExecutor(max_workers=5)  # You can probably increase max_workers, because the threads are almost exclusively doing I/O.
loop.run_until_complete(asyncio.wait([get_attachment(d, pool) for d in data]))
loop.close()

This isn't quite as nice as a truly asynchronous I/O-based solution, because you've still got the overhead of creating the threads, which limits scalability and adds extra memory overhead. You also do get some GIL slowdown because of all the code wrapping the actual I/O calls. Still, if you're dealing with less than thousands of mails, it should still perform ok.

We use run_in_executor to use the ThreadPoolExecutor as part of the asyncio event loop, asyncio.async to wrap the coroutine object returned in a asyncio.Future, and as_completed to iterate through the futures in the order they complete.

Edit:

It seems imaplib is not thread-safe. I've edited my answer to use thread-local storage via threading.local, which allows us to create one connection object per-thread, which can be re-used for the entire life of the thread (meaning you create num_workers connection objects only, rather than a new connection for every fetch).

Schweiker answered 30/7, 2014 at 14:19 Comment(4)
Thanks, this approach is what I was looking for. However receive some errors while running workers > 1. See here: gist.github.com/wiesson/5d611016ec223993e25d Why you are calling the asyncio.get_event_loop in the get_attachment loop?Equiponderate
@Equiponderate There's actually no need to call get_event_loop, since you can use the global one. Re: the errors you're seeing, my guess is that imaplib isn't thread safe, and would therefore require a connection object per-thread. You can use threading.local() to help with that. I'll edit my answer.Schweiker
asyncio.async SyntaxError: invalid syntaxLikelihood
@Likelihood Since I've posted this answer asyncio.async has been deprecated and removed. asyncio.create_task should be used instead. I've updated the answer accordingly.Schweiker
E
6

I had the same needs : fetching emails with python 3 fully async. If others here are interested I pushed an asyncio IMAP lib here : https://github.com/bamthomas/aioimaplib

You can use it like this :

import asyncio
from aioimaplib import aioimaplib

@asyncio.coroutine
def wait_for_new_message(host, user, password):
    imap_client = aioimaplib.IMAP4(host=host)
    yield from imap_client.wait_hello_from_server()

    yield from imap_client.login(user, password)
    yield from imap_client.select()

    asyncio.async(imap_client.idle())
    id = 0
    while True:
        msg = yield from imap_client.wait_server_push()
        print('--> received from server: %s' % msg)
        if 'EXISTS' in msg:
            id = msg.split()[0]
            imap_client.idle_done()
            break

    result, data = yield from imap_client.fetch(id, '(RFC822)')
    email_message = email.message_from_bytes(data[0])

    attachments = []
    body = ''
    for part in email_message.walk():
        if part.get_content_maintype() == 'multipart':
            continue
        if part.get_content_maintype() == 'text' and 'attachment' not in part.get('Content-Disposition', ''):
            body = part.get_payload(decode=True).decode(part.get_param('charset', 'ascii')).strip()
        else:
            attachments.append(
                {'type': part.get_content_type(), 'filename': part.get_filename(), 'size': len(part.as_bytes())})

    print('attachments : %s' % attachments)
    print('body : %s' % body)
    yield from imap_client.logout()



if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(wait_for_new_message('my.imap.server', 'user', 'pass'))

Large emails with attachments are also downloaded with asyncio.

Explanatory answered 5/7, 2016 at 14:44 Comment(3)
As a warning: there exist servers that will send * n EXISTS (n unchanged) as a keepalive during IDLE. On these servers you will fetch the same message repeatedly here.Federica
There's an 'email' is not defined error on running the code. I think the line import email should be included.Manstopper
There's also an error Module asyncio has no async memberManstopper

© 2022 - 2024 — McMap. All rights reserved.