How to use asyncio for asynchronous handler?

How to use asyncio for asynchronous handler?

Problem Description:

I have a function which constantly yields some objects, say 1 per second and a handler which works 2 seconds and handles this objects. For example:


from time import sleep
import asyncio
from datetime import datetime

def generator():
    i = 0
    while True:
        yield i
        i += 1
        sleep(1)

def handler(number):
    sleep(2)
    if number % 2 == 0:
        print(str(number) + ' is even')
    else:
        print(str(number) + ' is odd')


for number in generator():
    handler(number)

So, for example ‘2 is even’ is printed 6 seconds after the program starts. How do I reduce this time to 4 seconds ( 2 seconds for generator + 2 seconds for handler) using asyncio? I need to set up asynchronous handling of the numbers.

Solution – 1

You need couple of changes here:

  1. your generator currently is a "generator", change it to "asynchronous generator" so that you can use async for. This way it can give the control back to eventloop.

  2. Use async version of sleep in asyncio library: asyncio.sleep. time.sleep doesn’t cooperate with other tasks.

  3. Change your handler sync function to a "coroutine".

import asyncio


async def generator():
    i = 0
    while True:
        yield i
        i += 1
        await asyncio.sleep(1)


async def handler(number):
    await asyncio.sleep(2)
    if number % 2 == 0:
        print(str(number) + " is even")
    else:
        print(str(number) + " is odd")


async def main():
    async for number in generator():
        asyncio.create_task(handler(number))


asyncio.run(main())

Now, your first task is main, asyncio.run() automatically creates it as Task. Then when this task is running, it iterates asynchronously through the generator(). The values are received then for each number, you create a new Task out of handler coroutine.

This way the sleep times are overlapped. When it waits for new number for 1 second, it also actually waits for handler() one second. Then when the number is received, one second of handler() task is already passed, it only needs 1 second.

You can see the number of tasks if you want:

async def main():
    async for number in generator():
        print(f"Number of all tasks: {len(asyncio.all_tasks())}")
        asyncio.create_task(handler(number))

Because each handler sleeps 2 seconds, and your number generator sleeps 1 seconds, You see that in every iteration 2 Tasks are exist in event loop. Change await asyncio.sleep(1) to await asyncio.sleep(0.5) in generator coroutine, you will see that 4 tasks are in event loop in every iteration.


Answer to the comment:

Can I do the same thing if I use API which doesn’t let me create
asynchronous generator, but just a normal one? Can I still
asynchronously handle yielded objects?

Yes you can. Just note that if you don’t have asynchronous generator, you can’t use async for, which means your iteration is synchronous. But, you have to do a little trick for it to work. When your main() task is being executed, it constantly get a value from generator generator and creates a Task for it, but it doesn’t give a chance to other tasks to run. You need await asyncio.sleep(0):

import asyncio
import time


def generator():
    i = 0
    while True:
        yield i
        i += 1
        time.sleep(1)


async def handler(number):
    await asyncio.sleep(2)
    if number % 2 == 0:
        print(str(number) + " is even")
    else:
        print(str(number) + " is odd")


async def main():
    for number in generator():
        print(f"Number of all tasks: {len(asyncio.all_tasks())}")
        asyncio.create_task(handler(number))
        await asyncio.sleep(0)


asyncio.run(main())
Rate this post
We use cookies in order to give you the best possible experience on our website. By continuing to use this site, you agree to our use of cookies.
Accept
Reject