Basics of Asyncio in Python (#dev #python #asyncio #concurrency #tutorial)

[Appeon] Failed to initialize PBMemMgr

Overview

“A dev solved a problem by implementing an async solution. Now. have. a. doesn’t. he. anymore. problem.” - Ancient proverb on X Twitter. In this post, we’ll go trhough the basics of asyncio in Python, just enough so you can hit the ground running.

What is asyncio?

From their site:

asyncio is a library to write concurrent code using the async/await syntax. asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.

TL;DR: asyncio is a package that allows you to run two or more things at once.

Basic concepts

Event Loop

The event loop is the core of asyncio. It is a loop that runs tasks and callbacks, and it is responsible for scheduling and running the code. Think of it as what coordinates the execution of your async code. It knows all the tasks that are pending and when they should be executed.

If a task is in the loop (being executed), but it’s waiting for something to happen (like a network request), the loop will pause the task and run another one. When the awaited event happens, the loop will resume the task.

Coroutines

Coroutines are functions that can pause and resume their execution. They are defined with the async def syntax. When you call a coroutine, it returns a coroutine object, which you can then pass to the event loop to be executed.

import asyncio

async def main():
    print("Hello, world from a coroutine!")
    
if __name__ == "__main__":
    # Runs the coroutine
    asyncio.run(main())

When you call asyncio.run(), it creates an event loop, runs the coroutine, and closes the loop when the coroutine finishes, and in our case, it will finish after printing “Hello, world from a coroutine!”, since we’re not doing anything else in a loop or waiting for anything.

One thing that is worth nothing is that in this line asyncio.run(main()), seems like we’re executing the code inside the method, but we’re not. When the (coroutine) function main is called, it returns an object of type Coroutine. Which will be used by the main loop to execute the code inside the function.

This can be a bit confusing at first, because usually, when you pass a callback, you don’t execute it right away. But in this case, we are generating the coroutine object so that the event loop will know what to do.

Await

The await keyword is used to pause the execution of a coroutine until a certain condition is met. Just like you would use in any other programming language. The difference is that in Python, you can only use await inside a coroutine.

import asyncio

async def main():
    print("Hello, world from a coroutine!")
    await asyncio.sleep(1)
    print("Hello, world after 1 second!")
    
if __name__ == "__main__":
    asyncio.run(main())

In the example above, we’re using await asyncio.sleep(1) to pause the execution of the coroutine for 1 second. After that, the execution will continue, and the second print statement will be executed.

You can also await other coroutines:

import asyncio

async def fetch_products():
    # Pretend we're fetching a lot of products from an API or database...
    await asyncio.sleep(5)
    print("Fetched ALL the products!")
    return {"products": ["product1", "product2", "product3"]}
    
async def main():
    print(f"started at {time.strftime('%X')}")
    
    task = fetch_products()
    
    products = await task
    
    print(f"finished at {time.strftime('%X')}")

if __name__ == "__main__":
    asyncio.run(main())

In this example, we’re calling the fetch_products coroutine and awaiting its result. The fetch_products coroutine will pause for 5 seconds, and then it will print “Fetched ALL the products!”. After that, the main coroutine will continue its execution.

Note that in the main coroutine, we’re calling fetch_products without the await keyword, which creates a task. At this point in time, the code inside fetch_products is not being executed. Only when we await the task, the code will be executed. If we never await the task, the code inside fetch_products will never be executed.

Another important thing to note is that when we await the task, we don’t need to execute again (like await task()). Internally, that coroutine object will be used by the event loop to execute the code. You are just telling the event loop that you will wait for the result at that point in your program.

Tasks

Tasks are used to schedule coroutines to be run by the event loop. With tasks, you can run multiple coroutines at the same time.

import random
import asyncio

async def fetch_products(page: int):
    # Pretend we're fetching a lot of products from an API or database...
    # The delay will be between 2 and 2 + page seconds, just to simulate a real-world scenario
    delay = random.randint(2, 2 + page)
    
    await asyncio.sleep(delay)
    print(f"Fetched products from page {page} in {delay} seconds!")

    return {"products": ["product1", "product2", "product3"]}
    
async def main():
    print(f"started at {time.strftime('%X')}")
    # Creating tasks...
    task1 = asyncio.create_task(fetch_products(1))
    task2 = asyncio.create_task(fetch_products(2))
    task3 = asyncio.create_task(fetch_products(3))
    task4 = asyncio.create_task(fetch_products(4))
    
    # Waiting for all tasks to finish...
    result1 = await task1
    result2 = await task2
    result3 = await task3
    result4 = await task4

    print(f"finished at {time.strftime('%X')}")

if __name__ == "__main__":
    asyncio.run(main())

In the example above, we’re creating 4 different tasks, each one fetching products from a different page, simulating that we’re fetching products from a paginated API.

You might be wondering: From what I said in the previous section, this code will execute every task one after the other, right?

Well, not really. The big difference is that we’re using asyncio.create_task to create the tasks. So we’re not awaiting regular coroutines, we’re creating tasks that will be executed by the event loop as fast as possible. So, in this
case, whenever the task is sleeping, the event loop will run another task. So they will run concurrently, and the main coroutine will only finish when all tasks are done. Pretty neat, right? We didn’t need to create any special controls to achieve this.

Note: You can also cancel tasks, but I don’t think this falls into the “basics” category. I’ll leave a link this topic in the documentation at the end of the post.

Gather

The asyncio.gather function runs multiple coroutines at the same time, and returns their results after all of them have finished.

import random
import asyncio

async def fetch_products(page: int):
    # Pretend we're fetching a lot of products from an API or database...
    # The delay will be between 2 and 2 + page seconds, just to simulate a real-world scenario
    delay = random.randint(2, 2 + page)
    
    await asyncio.sleep(delay)
    print(f"Fetched products from page {page} in {delay} seconds!")

    return {"products": ["product1", "product2", "product3"]}
    
async def main():
    print(f"started at {time.strftime('%X')}")
    # Running multiple coroutines
    results = await asyncio.gather(
        fetch_products(1),
        fetch_products(2),
        fetch_products(3),
        fetch_products(4)
    )

    print(f"finished at {time.strftime('%X')}")

if __name__ == "__main__":
    asyncio.run(main())

In this example, we didn’t need to call asyncio.create_task to create the tasks, but we are “telling” asyncio that it must collect the results of all the coroutines passed to asyncio.gather and return them in a list.

The order of the results will be the same as the order of the coroutines passed to asyncio.gather, however, this does not mean that the coroutines will finish in the same order. It’s just the position of the result in the results list that is guaranteed to be the same as the position of the coroutine in the asyncio.gather call.

One noteworhy thing is that if one coroutine raises an error, asyncio.gather won’t stop the execution of all the other coroutines. It will just raise the error and continue the execution of the other coroutines.

Task Groups

Task groups are an alternative to asyncio.gather that allows you to run multiple coroutines concurrently and collect their results. According to the documentation, this provides a stronger safety guarantee than asyncio.gather.

The main difference between asyncio.TaskGroup and asyncio.gather is that TaskGroup will raise an exception if any of the coroutines raise an exception. This can be useful if you want to stop the execution of all coroutines if one of them fails.

import random
import asyncio

async def fetch_products(page: int):
    # Pretend we're fetching a lot of products from an API or database...
    # The delay will be between 2 and 2 + page seconds, just to simulate a real-world scenario
    delay = random.randint(2, 2 + page)
    
    await asyncio.sleep(delay)
    print(f"Fetched products from page {page} in {delay} seconds!")

    return {"products": ["product1", "product2", "product3"]}
    
async def main():
    print(f"started at {time.strftime('%X')}")
    # Running multiple coroutines
    tasks = []
    product_pages_to_fetch = 5

    async with asyncio.TaskGroup() as tg:
        for i in range(product_pages_to_fetch):
            tasks.append(tg.create_task(fetch_products(i+1)))

    results = [task.result() for task in tasks]

    print(f"finished at {time.strftime('%X')}")

if __name__ == "__main__":
    asyncio.run(main())

This is a slightly different approach to the previous example. First we create a task group context (async with asynctio.TaskGroup() as tg:), then we create the tasks inside the context by calling tg.create_task. Note that the method create_task is called from tg (the task group we created), and not from asyncio!

After that we don’t have to explictly await for anything. The context we created (with the async with keywords) will do that for us. This means that when we reach the next command (results = [task.result() for task in tasks]), all the tasks will have finished and we can collect their results.

If one or more tasks fail, the context will combine the exceptions in a ExceptionGroup (or BaseExceptiogroup) and raise it. The exceptions for this are KeyboardInterrupt and SystemExit errors, which (if they happen) will be raised instead of the ExceptionGroup. (Again, not going to go into details about this, but I’ll leave a link in the references section.)

Syncronization: Locks, Semaphores, and Events

So far we say tasks/coroutines that executed some “standalone” code, meaning that they don’t depend on each other, nor they rely on some shared resources. But what if you’re really lucky and in your project, the tasks need to access some common/shared data?

Lock

A lock is a synchronization primitive that allows only one task to access a shared resource at a time. If a task tries to

import asyncio

# This will be shared between the tasks
products_fetched = 0

# this will control the access to the shared resource
lock = asyncio.Lock()

async def fetch_products():
    global products_fetched
    async with lock:
        print("Fetching products...")
        await asyncio.sleep(1)
        products_fetched += random.randint(1, 100)
        print(f"Products fetched: {products_fetched}")


async def main():
    print(f"started at {time.strftime('%X')}")
    # Running multiple coroutines
    tasks = []
    concurrent_tasks = 5

    async with asyncio.TaskGroup() as tg:
        for i in range(concurrent_tasks):
            tasks.append(tg.create_task(fetch_products()))

    print(f"finished at {time.strftime('%X')}")

In this example, the code inside the main coroutine is pretty trivial (at this point). What we’re interested in is the code inside the fetch_products coroutine. In it, we’re creating an async context using async with lock:, and when we do this, we’re telling asyncio that only one task can access the code inside the context at a time.

How does this help you? Well, since only one task at a time can run the code inside the lock context, this means that we’ll never have two tasks trying to access our shared resource (products_fetched) at the same time.

Now, be mindful of the amount of code you place inside those contexts, because if you have a lot of code that needs to be executed inside the lock, you might end up with a bottleneck.

This lock works in a similar way to the lock keyword in C#. If you’re familiar with that, you’ll feel right at home.

Semaphore

Similar to a lock, the semaphore is a synchronization primitive that allows a fixed number of tasks to access a shared resource.

The usage is pretty similar:

import asyncio

async def fetch_products(semaphore):
    async with semaphore:
        print("Fetching products...")
        await asyncio.sleep(1)
        print("Products fetched!")

async def main():
    print(f"started at {time.strftime('%X')}")
    # Running multiple coroutines
    tasks = []
    concurrent_tasks = 5
    tasks_concurrently_doing_something = 2

    semaphore = asyncio.Semaphore(tasks_concurrently_doing_something)

    async with asyncio.TaskGroup() as tg:
        for i in range(concurrent_tasks):
            tasks.append(tg.create_task(fetch_products(semaphore)))

    print(f"finished at {time.strftime('%X')}")

In this example, we’re creating a semaphore and set a limit of 2 tasks that can be running at the same time. This would be useful for things like limiting the number of requests to an API, or the number of simultaneous connections to a database. For instance, if you want to make 10,000 requests to an API, but never more than 100 at a time, you could set the variable tasks_concurrently_doing_something to 100.

Event

An Event is a synchronization primitive that allows one task to signal other tasks that something has happened. This is pretty useful in cases where you have concurrent tasks, but a part of task-A depends on task-B finishing.

Example: You have a task that updates the product promotions in your database according to some business rules, and another task that will email the customers about the new promotions. In this case, you want to make sure that the emails will only be sent after the promotions are updated.

import asyncio

async def update_promotions(event):
    print("Updating promotions...")
    await asyncio.sleep(2)
    print("Promotions updated!")
    event.set()
    
async def send_emails(event):
    print("Fetching users from the database...")
    print("Deciding which users will receive the email...")
    print("Waiting for promotions to be updated...")
    await event.wait()
    print("Fetching promotions...")
    await asyncio.sleep(1)
    print("Sending emails...")
    await asyncio.sleep(1)

async def main():
    print(f"started at {time.strftime('%X')}")
    # Running multiple coroutines
    tasks = []
    event = asyncio.Event()

    async with asyncio.TaskGroup() as tg:
        tasks.append(tg.create_task(update_promotions(event)))
        tasks.append(tg.create_task(send_emails(event)))

    print(f"finished at {time.strftime('%X')}")

This is a pretty simple way to solve some dependencies between tasks. Granted that we should probably try to avoid this type of dependency, but sometimes it is what it is, and we need to deal with this type of thing.

Running in Threads

So far, all the examples we’ve seen are running in the same thread. Concurrently, but in the same thread. But what if you have a task that is blocking the event loop?

To solve this, we can tell asyncio to run a specific task in a separate thread:

import asyncio
import time

def blocking_task():
    print("Blocking task started...")
    time.sleep(5)
    print("Blocking task finished!")

async def non_blocking_task():
    print("Non-blocking task started...")
    await asyncio.sleep(1)
    print("Non-blocking task finished!")

async def main():
    print(f"started at {time.strftime('%X')}")
    # Running multiple coroutines
    tasks = []

    async with asyncio.TaskGroup() as tg:
        threaded_task = asyncio.to_thread(blocking_task)
        tasks.append(tg.create_task(threaded_task))
        tasks.append(tg.create_task(non_blocking_task()))

    print(f"finished at {time.strftime('%X')}")

if __name__ == "__main__":
    asyncio.run(main())

In this example, the blocking_task() function will, as the name suggests, block the event loop for 5 seconds. To avoid this, we’re using asyncio.to_thread to run this task in a separate thread. This way, the event loop will not be blocked.

This blocking operation could be anything, in this case we’re just using the regular time.sleep(5) to make it simple.

Note that the blocking_task function is not a coroutine (there is not async keyword in the definition), but we can still run it in a separate thread using asyncio.to_thread.

If you want blocking_task function to be a coroutine, we can switch things around a bit. Look at this new example:

import asyncio
import time


async def blocking_task():
    print("Blocking task started...")
    await asyncio.to_thread(time.sleep, 5)
    print("Blocking task finished!")


async def non_blocking_task():
    print("Non-blocking task started...")
    await asyncio.sleep(1)
    print("Non-blocking task finished!")


async def main():
    print(f"started at {time.strftime('%X')}")
    # Running multiple coroutines
    tasks = []

    async with asyncio.TaskGroup() as tg:
        tasks.append(tg.create_task(blocking_task()))
        tasks.append(tg.create_task(non_blocking_task()))

    print(f"finished at {time.strftime('%X')}")


if __name__ == "__main__":
    asyncio.run(main())

The main difference in this new snippet is that blocking_task is now a coroutine, and inside it we’re using await asyncio.to_thread only on the time.sleep(5) call. This way, we’re telling asyncio to run only this part of the code in a separate thread.

Conclusion

Alright, this post is already pretty big, but I think it covers the basics of what this package can do, and this is just the tip of the iceberg when it comes to asyncio. If you want to dive deep into what this package can do, I highly recommend you to check the documentation.

I hope that helps! :)

References

Topics we didn’t cover

Translations: