How to integrate custom task into aiogram executor?
Asked Answered
P

4

5
import asyncio
from threading import Thread
from datetime import datetime
from aiogram import Bot, Dispatcher, executor, types

API_TOKEN = ''

bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)

chat_ids = {}

@dp.message_handler()
async def echo(message: types.Message):
    # old style:
    # await bot.send_message(message.chat.id, message.text)

    chat_ids[message.message_id] = message.from_user
    text = f'{message.message_id} {message.from_user} {message.text}'
    await message.reply(text, reply=False)


async def periodic(sleep_for, queue):
    while True:
        await asyncio.sleep(sleep_for)
        now = datetime.utcnow()
        print(f"{now}")
        for id in chat_ids:
            queue.put_nowait((id, f"{now}"))
            # await bot.send_message(id, f"{now}", disable_notification=True)


def run_tick(queue):
    newloop = asyncio.new_event_loop()
    asyncio.set_event_loop(newloop)
    asyncio.run(periodic(3, queue))


if __name__ == '__main__':
    queue = asyncio.Queue()
    Thread(target=run_tick, args=(queue,), daemon=True).start()
    executor.start_polling(dp, skip_updates=True)

I want to send messages to registered users by bot.send_message when there is a event but failed for now. Here are what I tried.

  1. bot.send_message crashes because it is called from another thread. (Timeout context manager should be used inside a task)
  2. So, I tried to workaround this by using queue, but there is no way to add my own task into executor.

Is there any simple way to do this?


Edit: 2020-1-3

Here's working example as per @user4815162342.

import asyncio
from datetime import datetime
from aiogram import Bot, Dispatcher, executor, types

API_TOKEN = ''

bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)

chat_ids = {}

@dp.message_handler()
async def echo(message: types.Message):
    chat_ids[message.from_user.id] = message.from_user
    text = f'{message.message_id} {message.from_user} {message.text}'
    await message.reply(text, reply=False)

async def periodic(sleep_for):
    while True:
        await asyncio.sleep(sleep_for)
        now = datetime.utcnow()
        print(f"{now}")
        for id in chat_ids:
            await bot.send_message(id, f"{now}", disable_notification=True)

if __name__ == '__main__':
    dp.loop.create_task(periodic(10))
    executor.start_polling(dp)

Photoplay answered 27/12, 2019 at 14:46 Comment(0)
S
9

The initial problem was that you tried to call asyncio code from a different thread. To fix the resulting error you have created a new event loop while keeping the additional thread. As the saying goes, now you have two problems.

The queue idea looks unfinished because there is no code that reads from the queue; and even if there were, it wouldn't work because asyncio queues are not designed to be shared between event loops or between threads. To untangle the mess, you need to find a way to run your periodic updates from within the event loop, i.e. re-examine this assumption:

but there is no way to add my own task into executor.

Looking at the source of Executor, it appears to pick up the event loop from the dispatcher, which holds it in the publicly accessible loop attribute. That means that you can create a task simply by invoking the create_task method on that loop. For example:

if __name__ == '__main__':
    dp.loop.create_task(periodic())
    executor.start_polling(dp, skip_updates=True)

Now periodic can be formulated as in your initial attempt:

async def periodic(sleep_for, queue):
    while True:
        await asyncio.sleep(sleep_for)
        now = datetime.utcnow()
        for id in chat_ids:
            await bot.send_message(id, f"{now}",
                                   disable_notification=True)

Please note that I haven't tested this because I do not use aiogram. A potential issue that you might need to address is that your chat_ids dict appears to contain message.message_id as key, whereas bot.send_message accepts a message.chat.id.

Stethoscope answered 29/12, 2019 at 16:2 Comment(1)
Thank you so much. I totally misunderstood how asyncio works. Now it works as expected due to your thoughtful explanation.Photoplay
E
0

This code works for aiogram 2.24 version

import asyncio
from aiogram import Bot, Dispatcher

...
dp = Dispatcher(bot)

async def periodical_task():
    while True:
        await asyncio.sleep(15)
        logging.info("Start periodic")


dp._loop_create_task(periodical_task())
Elisa answered 5/2, 2023 at 11:26 Comment(0)
S
0

In the new versions the provded methods aren't working anymore, I have the following version:

CPython: 3.9.15 (main, Oct 25 2022, 06:48:03) [GCC 10.2.1 20210110]
OS: Linux-5.15.0-1029-oracle-aarch64-with-glibc2.31
aiogram: 2.23.1
aiohttp: 3.8.3
uvloop: 0.16.0
JSON mode: json

start_pooling function in the executor executes the asyncio.get_event_loop() function, then loop.run_forever(), I found this code in the Aiogram package. The idea is to create async tasks using the Asyncio module instead of Aiogram. The following worked for me, I created a function that will be executed on startup, then I created asyncio tasks in that function. When the we call the executor.start_pooling() function, it will run also those async tasks that we created:


from aiogram import Bot, Dispatcher, executor


async def loop_fun_1():
    while True:
        logging.debug("FUN1")
        await asyncio.sleep(5)


async def loop_fun_2():
    while True:
        logging.debug("FUN2")
        await asyncio.sleep(5)


async def on_startup_launch():
    asyncio.create_task(loop_fun_1())
    asyncio.create_task(loop_fun_2())


bot = Bot(token="***")
dp = Dispatcher(bot=bot)
executor.start_polling(
        dispatcher=dp,
        skip_updates=True,
        on_startup=on_startup_launch
    )

Stuckey answered 25/5, 2023 at 9:57 Comment(0)
R
0

for aiogram 3.14, loop is not created by iogram anymore, so this works:


import asyncio

from mymodule.core import bot, dp, logger
from mymodule.handlers import router

async def periodic_task(period: int):
    try:
        while True:
            print("Running periodic task")
            await asyncio.sleep(period)
    except asyncio.CancelledError:
        # Perform any cleanup if needed
        logger.info(f"{file_type} task canceled")

async def main():
    asyncio.create_task(periodic_task(1))
    asyncio.create_task(periodic_task(3))

    dp.include_router(router)
    await dp.start_polling(bot)

if __name__ == '__main__':
    asyncio.run(main())
Rozele answered 8/11, 2024 at 12:32 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.