RuntimeError: There is no current event loop in thread in async + apscheduler
Asked Answered
S

8

119

I have a async function and need to run in with apscheduller every N minutes. There is a python code below

URL_LIST = ['<url1>',
            '<url2>',
            '<url2>',
            ]

def demo_async(urls):
    """Fetch list of web pages asynchronously."""
    loop = asyncio.get_event_loop() # event loop
    future = asyncio.ensure_future(fetch_all(urls)) # tasks to do
    loop.run_until_complete(future) # loop until done

async def fetch_all(urls):
    tasks = [] # dictionary of start times for each url
    async with ClientSession() as session:
        for url in urls:
            task = asyncio.ensure_future(fetch(url, session))
            tasks.append(task) # create list of tasks
        _ = await asyncio.gather(*tasks) # gather task responses

async def fetch(url, session):
    """Fetch a url, using specified ClientSession."""
    async with session.get(url) as response:
        resp = await response.read()
        print(resp)

if __name__ == '__main__':
    scheduler = AsyncIOScheduler()
    scheduler.add_job(demo_async, args=[URL_LIST], trigger='interval', seconds=15)
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
    try:
        asyncio.get_event_loop().run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass

But when i tried to run it i have the next error info

Job "demo_async (trigger: interval[0:00:15], next run at: 2017-10-12 18:21:12 +04)" raised an exception.....
..........\lib\asyncio\events.py", line 584, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread '<concurrent.futures.thread.ThreadPoolExecutor object at 0x0356B150>_0'.

Could you please help me with this? Python 3.6, APScheduler 3.3.1,

Sendoff answered 13/10, 2017 at 10:27 Comment(0)
L
17

Just pass fetch_all to scheduler.add_job() directly. The asyncio scheduler supports coroutine functions as job targets.

If the target callable is not a coroutine function, it will be run in a worker thread (due to historical reasons), hence the exception.

Linares answered 14/10, 2017 at 8:40 Comment(3)
Hello and thank you for the response could you explain why scheduler add job is better than adding a second event loop? @Flickinger in his response mentions that the reason the issue exists is that he is not in the main thread.Beatabeaten
The second event loop just adds more complexity, for no benefit at all.Flatfooted
Thank you for the comment, can you elaborate a bit on the ideal scenario scenario to add a second event loop ? If I understnad correctly it adds a second process that can handle parallel requests while the scehduler goes for a multithreaded solution (which is handled concurrently but not in parallel in Cpython due to GIL imho)Beatabeaten
I
208

In your def demo_async(urls), try to replace:

loop = asyncio.get_event_loop()

with:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
Intervenient answered 15/10, 2017 at 0:24 Comment(10)
Do not do this! This will just cause a second event loop to be run. It is completely pointless, given APScheduler's native support of coroutine functions. Bad things will happen if you try to make things running in one event loop interact with the first one.Flatfooted
sometimes you want a second event loop. sometimes you don'tOrchestra
I found this necessary when running a loop in a separate thread.Wed
Hi @AlexGrönholm, could you please explain why is this bad? That would be of tremendous value.Clingy
Running multiple event loops complicates matters and in this particular case it is completely unnecessary. Furthermore, subprocess support will not work on Linux in threads other than the main thread. Does this answer your question?Flatfooted
this makes another loop, keep in mind things in one loop cannot be used in another loopAlkane
@AlexGrönholm, can you please expand on your 2nd comment, you can even redirect me to any blog or documentation too. I need to use get_event_loop in my new process spawned. But I get this error. The above answer will solve my problem.Cissie
@Cissie I think you need to ask this as a new question here.Flatfooted
Related Q&A on the DeprecationWarning version of this RuntimeError : https://mcmap.net/q/188768/-asyncio-get_event_loop-deprecationwarning-there-is-no-current-event-loopKrug
After much searching, I opted for this solution: https://mcmap.net/q/188769/-when-to-use-asyncio-get_running_loop-vs-asyncio-get_event_loop.Krug
F
48

The important thing that hasn't been mentioned is why the error occurs. For me personally, knowing why the error occurs is as important as solving the actual problem.

Let's take a look at the implementation of the get_event_loop of BaseDefaultEventLoopPolicy:

class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
    ...

    def get_event_loop(self):
        """Get the event loop.

        This may be None or an instance of EventLoop.
        """
        if (self._local._loop is None and
            not self._local._set_called and
            isinstance(threading.current_thread(), threading._MainThread)):
            self.set_event_loop(self.new_event_loop())
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
        return self._local._loop

You can see that the self.set_event_loop(self.new_event_loop()) is only executed if all of the below conditions are met:

  • self._local._loop is None - _local._loop is not set
  • not self._local._set_called - set_event_loop hasn't been called yet
  • isinstance(threading.current_thread(), threading._MainThread) - current thread is the main one (this is not True in your case)

Therefore the exception is raised, because no loop is set in the current thread:

if self._local._loop is None:
    raise RuntimeError('There is no current event loop in thread %r.'
                       % threading.current_thread().name)
Flickinger answered 21/3, 2019 at 10:29 Comment(3)
Do you understand why the isinstance... condition is important? Why do we only create event loop in the _MainThread?Clingy
Because on Linux, SIGCHLD is only delivered to the main thread, so terminated subprocesses can be acknowledged only by an event loop running in the main thread.Flatfooted
This was super helpful. Because asyncio.get_event_loop() throws RuntimeException("There is no current event loop in thread 'MainThread'."), my approach is to call it in a try:, followed by except RuntimeException: loop = asyncio.new_event_loop().Lipoid
L
17

Just pass fetch_all to scheduler.add_job() directly. The asyncio scheduler supports coroutine functions as job targets.

If the target callable is not a coroutine function, it will be run in a worker thread (due to historical reasons), hence the exception.

Linares answered 14/10, 2017 at 8:40 Comment(3)
Hello and thank you for the response could you explain why scheduler add job is better than adding a second event loop? @Flickinger in his response mentions that the reason the issue exists is that he is not in the main thread.Beatabeaten
The second event loop just adds more complexity, for no benefit at all.Flatfooted
Thank you for the comment, can you elaborate a bit on the ideal scenario scenario to add a second event loop ? If I understnad correctly it adds a second process that can handle parallel requests while the scehduler goes for a multithreaded solution (which is handled concurrently but not in parallel in Cpython due to GIL imho)Beatabeaten
D
17

I had a similar issue where I wanted my asyncio module to be callable from a non-asyncio script (which was running under gevent... don't ask...). The code below resolved my issue because it tries to get the current event loop, but will create one if there isn't one in the current thread. Tested in python 3.9.11.

try:
    loop = asyncio.get_event_loop()
except RuntimeError as e:
    if str(e).startswith('There is no current event loop in thread'):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    else:
        raise
Dexamethasone answered 12/5, 2022 at 17:45 Comment(4)
This answer is identical to the highest-voted answer (it only adds a try-except) which is criticized (see its comments): https://mcmap.net/q/186338/-runtimeerror-there-is-no-current-event-loop-in-thread-in-async-apschedulerKrug
@Krug That answer is criticized only by you, and without any justification. And this answer is also a useful one, in cases where you want to obtain a reference to the default event loop but you don't know if there is a currently running loop or not. It seems you do not understand the questions or the purpose of the code in both cases.Bencion
For reference, I never criticized the answer I linked above. I also would like to remind users that the rules of SO include be nice to each other, and to refer to them when writing disagreeing comments.Krug
@PaulCornelius this way. at least there won't be two loops, AFAICT?Overtop
W
7

Use asyncio.run() instead of directly using the event loop. It creates a new loop and closes it when finished.

This is how the 'run' looks like:

if events._get_running_loop() is not None:
    raise RuntimeError(
        "asyncio.run() cannot be called from a running event loop")

if not coroutines.iscoroutine(main):
    raise ValueError("a coroutine was expected, got {!r}".format(main))

loop = events.new_event_loop()
try:
    events.set_event_loop(loop)
    loop.set_debug(debug)
    return loop.run_until_complete(main)
finally:
    try:
        _cancel_all_tasks(loop)
        loop.run_until_complete(loop.shutdown_asyncgens())
    finally:
        events.set_event_loop(None)
        loop.close()
Wilmot answered 4/6, 2019 at 15:19 Comment(2)
I'm using asyncio.run() but still has this issue...Selfless
Using asyncio.run() in the main script and trying to invoke asyncio.get_event_loop() in another class will cause this error to show up.Buy
A
4

Since this question continues to appear on the first page, I will write my problem and my answer here.

I had a RuntimeError: There is no current event loop in thread 'Thread-X'. when using flask-socketio and Bleak.


Edit: well, I refactored my file and made a class.

I initialized the loop in the constructor, and now everything is working fine:

class BLE:
    def __init__(self):
        self.loop = asyncio.get_event_loop()

    # function example, improvement of
    # https://github.com/hbldh/bleak/blob/master/examples/discover.py :
    def list_bluetooth_low_energy(self) -> list:
        async def run() -> list:
            BLElist = []
            devices = await bleak.discover()
            for d in devices:
                BLElist.append(d.name)
            return 'success', BLElist
        return self.loop.run_until_complete(run())

Usage:

ble = path.to.lib.BLE()
list = ble.list_bluetooth_low_energy()

Original answer:

The solution was stupid. I did not pay attention to what I did, but I moved some import out of a function, like this:

import asyncio, platform
from bleak import discover

def listBLE() -> dict:
    async def run() -> dict:
        # my code that keep throwing exceptions.

    loop = asyncio.get_event_loop()
    ble_list = loop.run_until_complete(run())
    return ble_list

So I thought that I needed to change something in my code, and I created a new event loop using this piece of code just before the line with get_event_loop():

loop = asyncio.new_event_loop()
loop = asyncio.set_event_loop()

At this moment I was pretty happy, since I had a loop running.

But not responding. And my code relied on a timeout to return some values, so it was pretty bad for my app.

It took me nearly two hours to figure out that the problem was the import, and here is my (working) code:

def list() -> dict:
    import asyncio, platform
    from bleak import discover

    async def run() -> dict:
        # my code running perfectly

    loop = asyncio.get_event_loop()
    ble_list  = loop.run_until_complete(run())
    return ble_list
Appose answered 13/6, 2019 at 12:34 Comment(1)
RuntimeError: There is no current event loop in thread 'MainThread'.Marjorie
B
3

Reading given answers I only manage to fix my websocket thread by using the hint (try replacing) in https://mcmap.net/q/186338/-runtimeerror-there-is-no-current-event-loop-in-thread-in-async-apscheduler on this page.

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

The documentation of BaseDefaultEventLoopPolicy explains

Default policy implementation for accessing the event loop. In this policy, each thread has its own event loop. However, we only automatically create an event loop by default for the main thread; other threads by default have no event loop.

So when using a thread one has to create the loop.

And I had to reorder my code so my final code

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    # !!! Place code after setting the loop !!!
    server = Server()
    start_server = websockets.serve(server.ws_handler, 'localhost', port)
Boldface answered 11/5, 2022 at 19:39 Comment(0)
W
-1

In my case the line was like this

asyncio.get_event_loop().run_until_complete(test())

I replaced above line with this line which solved my problem

asyncio.run(test())
Wolof answered 3/8, 2022 at 11:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.