Basics of Asyncio in Python (#dev #python #asyncio #concurrency #tutorial)
Overview
“A dev solved a problem by implementing an async solution. Now. have. a. doesn’t. he. anymore. problem.” - Ancient proverb on X Twitter.
In this post, we’ll go trhough the basics of asyncio in Python, just enough so you can hit the ground running.
What is asyncio?
From their site:
asyncio is a library to write concurrent code using the async/await syntax. asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.
TL;DR: asyncio is a package that allows you to run two or more things at once.
Basic concepts
Event Loop
The event loop is the core of asyncio. It is a loop that runs tasks and callbacks, and it is responsible for scheduling and running the code. Think of it as what coordinates the execution of your async code. It knows all the tasks that are pending and when they should be executed.
If a task is in the loop (being executed), but it’s waiting for something to happen (like a network request), the loop will pause the task and run another one. When the awaited event happens, the loop will resume the task.
Coroutines
Coroutines are functions that can pause and resume their execution. They are defined with the async def
syntax.
When you call a coroutine, it returns a coroutine object, which you can then pass to the event loop to be executed.
import asyncio
async def main():
print("Hello, world from a coroutine!")
if __name__ == "__main__":
# Runs the coroutine
asyncio.run(main())
When you call asyncio.run()
, it creates an event loop, runs the coroutine, and closes the loop when the coroutine
finishes, and in our case, it will finish after printing “Hello, world from a coroutine!”, since we’re not doing anything
else in a loop or waiting for anything.
One thing that is worth nothing is that in this line asyncio.run(main())
, seems like we’re executing the
code inside the method, but we’re not. When the (coroutine) function main is called, it returns an object of type
Coroutine
. Which will be used by the main loop to execute the code inside the function.
This can be a bit confusing at first, because usually, when you pass a callback, you don’t execute it right away. But in this case, we are generating the coroutine object so that the event loop will know what to do.
Await
The await
keyword is used to pause the execution of a coroutine until a certain condition is met. Just like you would
use in any other programming language. The difference is that in Python, you can only use await
inside a coroutine.
import asyncio
async def main():
print("Hello, world from a coroutine!")
await asyncio.sleep(1)
print("Hello, world after 1 second!")
if __name__ == "__main__":
asyncio.run(main())
In the example above, we’re using await asyncio.sleep(1)
to pause the execution of the coroutine for 1 second. After
that, the execution will continue, and the second print statement will be executed.
You can also await other coroutines
:
import asyncio
async def fetch_products():
# Pretend we're fetching a lot of products from an API or database...
await asyncio.sleep(5)
print("Fetched ALL the products!")
return {"products": ["product1", "product2", "product3"]}
async def main():
print(f"started at {time.strftime('%X')}")
task = fetch_products()
products = await task
print(f"finished at {time.strftime('%X')}")
if __name__ == "__main__":
asyncio.run(main())
In this example, we’re calling the fetch_products
coroutine and awaiting its result. The fetch_products
coroutine
will pause for 5 seconds, and then it will print “Fetched ALL the products!”. After that, the main
coroutine will
continue its execution.
Note that in the main coroutine, we’re calling fetch_products
without the await
keyword, which creates a task. At
this point in time, the code inside fetch_products
is not being executed. Only when we await the task, the code will
be executed. If we never await
the task
, the code inside fetch_products
will never be executed.
Another important thing to note is that when we await the task, we don’t need to execute again (like await task()
).
Internally, that coroutine object will be used by the event loop to execute the code. You are just telling the event loop
that you will wait for the result at that point in your program.
Tasks
Tasks are used to schedule coroutines to be run by the event loop. With tasks, you can run multiple coroutines at the same time.
import random
import asyncio
async def fetch_products(page: int):
# Pretend we're fetching a lot of products from an API or database...
# The delay will be between 2 and 2 + page seconds, just to simulate a real-world scenario
delay = random.randint(2, 2 + page)
await asyncio.sleep(delay)
print(f"Fetched products from page {page} in {delay} seconds!")
return {"products": ["product1", "product2", "product3"]}
async def main():
print(f"started at {time.strftime('%X')}")
# Creating tasks...
task1 = asyncio.create_task(fetch_products(1))
task2 = asyncio.create_task(fetch_products(2))
task3 = asyncio.create_task(fetch_products(3))
task4 = asyncio.create_task(fetch_products(4))
# Waiting for all tasks to finish...
result1 = await task1
result2 = await task2
result3 = await task3
result4 = await task4
print(f"finished at {time.strftime('%X')}")
if __name__ == "__main__":
asyncio.run(main())
In the example above, we’re creating 4 different tasks, each one fetching products from a different page, simulating that we’re fetching products from a paginated API.
You might be wondering: From what I said in the previous section, this code will execute every task one after the other, right?
Well, not really. The big difference is that we’re using asyncio.create_task
to create the tasks. So we’re not awaiting
regular coroutines
, we’re creating tasks
that will be executed by the event loop as fast as possible. So, in this
case, whenever the task is sleeping, the event loop will run another task. So they will run concurrently, and the main
coroutine will only finish when all tasks are done. Pretty neat, right? We didn’t need to create any special controls to
achieve this.
Note: You can also cancel tasks, but I don’t think this falls into the “basics” category. I’ll leave a link this topic in the documentation at the end of the post.
Gather
The asyncio.gather
function runs multiple coroutines at the same time, and returns their results after all of them have
finished.
import random
import asyncio
async def fetch_products(page: int):
# Pretend we're fetching a lot of products from an API or database...
# The delay will be between 2 and 2 + page seconds, just to simulate a real-world scenario
delay = random.randint(2, 2 + page)
await asyncio.sleep(delay)
print(f"Fetched products from page {page} in {delay} seconds!")
return {"products": ["product1", "product2", "product3"]}
async def main():
print(f"started at {time.strftime('%X')}")
# Running multiple coroutines
results = await asyncio.gather(
fetch_products(1),
fetch_products(2),
fetch_products(3),
fetch_products(4)
)
print(f"finished at {time.strftime('%X')}")
if __name__ == "__main__":
asyncio.run(main())
In this example, we didn’t need to call asyncio.create_task
to create the tasks, but we are “telling” asyncio that it
must collect the results of all the coroutines passed to asyncio.gather
and return them in a list.
The order of the results will be the same as the order of the coroutines passed to asyncio.gather
, however, this does
not mean that the coroutines will finish in the same order. It’s just the position of the result
in the results list
that is guaranteed to be the same as the position of the coroutine in the asyncio.gather
call.
One noteworhy thing is that if one coroutine raises an error, asyncio.gather
won’t stop the execution of all the other
coroutines. It will just raise the error and continue the execution of the other coroutines.
Task Groups
Task groups are an alternative to asyncio.gather
that allows you to run multiple coroutines concurrently and collect
their results. According to the documentation, this provides a stronger safety guarantee than asyncio.gather
.
The main difference between asyncio.TaskGroup
and asyncio.gather
is that TaskGroup will raise an exception if any of
the coroutines raise an exception. This can be useful if you want to stop the execution of all coroutines if one of them
fails.
import random
import asyncio
async def fetch_products(page: int):
# Pretend we're fetching a lot of products from an API or database...
# The delay will be between 2 and 2 + page seconds, just to simulate a real-world scenario
delay = random.randint(2, 2 + page)
await asyncio.sleep(delay)
print(f"Fetched products from page {page} in {delay} seconds!")
return {"products": ["product1", "product2", "product3"]}
async def main():
print(f"started at {time.strftime('%X')}")
# Running multiple coroutines
tasks = []
product_pages_to_fetch = 5
async with asyncio.TaskGroup() as tg:
for i in range(product_pages_to_fetch):
tasks.append(tg.create_task(fetch_products(i+1)))
results = [task.result() for task in tasks]
print(f"finished at {time.strftime('%X')}")
if __name__ == "__main__":
asyncio.run(main())
This is a slightly different approach to the previous example. First we create a task group context
(async with asynctio.TaskGroup() as tg:
), then we create the tasks
inside the context by calling tg.create_task
.
Note that the method create_task
is called from tg
(the task group we created), and not from asyncio
!
After that we don’t have to explictly await for anything. The context we created (with the async with
keywords) will
do that for us. This means that when we reach the next command (results = [task.result() for task in tasks]
), all the
tasks will have finished and we can collect their results.
If one or more tasks fail, the context will combine the exceptions in a ExceptionGroup (or BaseExceptiogroup) and raise
it. The exceptions for this are KeyboardInterrupt
and SystemExit
errors, which (if they happen) will be raised instead
of the ExceptionGroup. (Again, not going to go into details about this, but I’ll leave a link in the references section.)
Syncronization: Locks, Semaphores, and Events
So far we say tasks/coroutines that executed some “standalone” code, meaning that they don’t depend on each other, nor they rely on some shared resources. But what if you’re really lucky and in your project, the tasks need to access some common/shared data?
Lock
A lock is a synchronization primitive that allows only one task to access a shared resource at a time. If a task tries to
import asyncio
# This will be shared between the tasks
products_fetched = 0
# this will control the access to the shared resource
lock = asyncio.Lock()
async def fetch_products():
global products_fetched
async with lock:
print("Fetching products...")
await asyncio.sleep(1)
products_fetched += random.randint(1, 100)
print(f"Products fetched: {products_fetched}")
async def main():
print(f"started at {time.strftime('%X')}")
# Running multiple coroutines
tasks = []
concurrent_tasks = 5
async with asyncio.TaskGroup() as tg:
for i in range(concurrent_tasks):
tasks.append(tg.create_task(fetch_products()))
print(f"finished at {time.strftime('%X')}")
In this example, the code inside the main coroutine is pretty trivial (at this point). What we’re interested in is the
code inside the fetch_products
coroutine. In it, we’re creating an async context using async with lock:
, and when we
do this, we’re telling asyncio that only one task can access the code inside the context at a time.
How does this help you? Well, since only one task at a time can run the code inside the lock context, this means that
we’ll never have two tasks trying to access our shared resource (products_fetched
) at the same time.
Now, be mindful of the amount of code you place inside those contexts, because if you have a lot of code that needs to be executed inside the lock, you might end up with a bottleneck.
This lock
works in a similar way to the lock
keyword in C#. If you’re familiar with that, you’ll feel right at home.
Semaphore
Similar to a lock
, the semaphore
is a synchronization primitive that allows a fixed number of tasks to access a shared
resource.
The usage is pretty similar:
import asyncio
async def fetch_products(semaphore):
async with semaphore:
print("Fetching products...")
await asyncio.sleep(1)
print("Products fetched!")
async def main():
print(f"started at {time.strftime('%X')}")
# Running multiple coroutines
tasks = []
concurrent_tasks = 5
tasks_concurrently_doing_something = 2
semaphore = asyncio.Semaphore(tasks_concurrently_doing_something)
async with asyncio.TaskGroup() as tg:
for i in range(concurrent_tasks):
tasks.append(tg.create_task(fetch_products(semaphore)))
print(f"finished at {time.strftime('%X')}")
In this example, we’re creating a semaphore
and set a limit of 2 tasks that can be running at the same time. This
would be useful for things like limiting the number of requests to an API, or the number of simultaneous connections to
a database. For instance, if you want to make 10,000 requests to an API, but never more than 100 at a time, you could
set the variable tasks_concurrently_doing_something
to 100.
Event
An Event
is a synchronization primitive that allows one task to signal other tasks that something has happened. This
is pretty useful in cases where you have concurrent tasks, but a part of task-A depends on task-B finishing.
Example: You have a task that updates the product promotions in your database according to some business rules, and another task that will email the customers about the new promotions. In this case, you want to make sure that the emails will only be sent after the promotions are updated.
import asyncio
async def update_promotions(event):
print("Updating promotions...")
await asyncio.sleep(2)
print("Promotions updated!")
event.set()
async def send_emails(event):
print("Fetching users from the database...")
print("Deciding which users will receive the email...")
print("Waiting for promotions to be updated...")
await event.wait()
print("Fetching promotions...")
await asyncio.sleep(1)
print("Sending emails...")
await asyncio.sleep(1)
async def main():
print(f"started at {time.strftime('%X')}")
# Running multiple coroutines
tasks = []
event = asyncio.Event()
async with asyncio.TaskGroup() as tg:
tasks.append(tg.create_task(update_promotions(event)))
tasks.append(tg.create_task(send_emails(event)))
print(f"finished at {time.strftime('%X')}")
This is a pretty simple way to solve some dependencies between tasks. Granted that we should probably try to avoid this type of dependency, but sometimes it is what it is, and we need to deal with this type of thing.
Running in Threads
So far, all the examples we’ve seen are running in the same thread. Concurrently, but in the same thread. But what if you have a task that is blocking the event loop?
To solve this, we can tell asyncio to run a specific task in a separate thread:
import asyncio
import time
def blocking_task():
print("Blocking task started...")
time.sleep(5)
print("Blocking task finished!")
async def non_blocking_task():
print("Non-blocking task started...")
await asyncio.sleep(1)
print("Non-blocking task finished!")
async def main():
print(f"started at {time.strftime('%X')}")
# Running multiple coroutines
tasks = []
async with asyncio.TaskGroup() as tg:
threaded_task = asyncio.to_thread(blocking_task)
tasks.append(tg.create_task(threaded_task))
tasks.append(tg.create_task(non_blocking_task()))
print(f"finished at {time.strftime('%X')}")
if __name__ == "__main__":
asyncio.run(main())
In this example, the blocking_task() function will, as the name suggests, block the event loop for 5 seconds. To avoid
this, we’re using asyncio.to_thread
to run this task in a separate thread. This way, the event loop will not be
blocked.
This blocking operation could be anything, in this case we’re just using the regular time.sleep(5)
to make it simple.
Note that the blocking_task
function is not a coroutine (there is not async
keyword in the definition), but we can
still run it in a separate thread using asyncio.to_thread
.
If you want blocking_task
function to be a coroutine, we can switch things around a bit. Look at this new example:
import asyncio
import time
async def blocking_task():
print("Blocking task started...")
await asyncio.to_thread(time.sleep, 5)
print("Blocking task finished!")
async def non_blocking_task():
print("Non-blocking task started...")
await asyncio.sleep(1)
print("Non-blocking task finished!")
async def main():
print(f"started at {time.strftime('%X')}")
# Running multiple coroutines
tasks = []
async with asyncio.TaskGroup() as tg:
tasks.append(tg.create_task(blocking_task()))
tasks.append(tg.create_task(non_blocking_task()))
print(f"finished at {time.strftime('%X')}")
if __name__ == "__main__":
asyncio.run(main())
The main difference in this new snippet is that blocking_task is now a coroutine, and inside it we’re using
await asyncio.to_thread
only on the time.sleep(5)
call. This way, we’re telling asyncio to run only this part of the
code in a separate thread.
Conclusion
Alright, this post is already pretty big, but I think it covers the basics of what this package can do, and this is just
the tip of the iceberg when it comes to asyncio
. If you want to dive deep into what this package can do, I highly
recommend you to check the documentation.
I hope that helps! :)
References
- Asyncio documentation: https://docs.python.org/3/library/asyncio.html
- Event Loop: https://docs.python.org/3/library/asyncio-eventloop.html
- Coroutines: https://docs.python.org/3/library/asyncio-task.html#coroutines
- Awaitables: https://docs.python.org/3/library/asyncio-task.html#awaitables
- Await: https://docs.python.org/3/reference/expressions.html#await
- Task: https://docs.python.org/3/library/asyncio-task.html#asyncio.Task
- Create Task: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
- Gather: https://docs.python.org/3/library/asyncio-task.html#asyncio.gather
- Task Groups: https://docs.python.org/3/library/asyncio-task.html#task-groups
- Synchronization Primitives: https://docs.python.org/3/library/asyncio-sync.html#synchronization-primitives
- Lock: https://docs.python.org/3/library/asyncio-sync.html#asyncio.Lock
- Semaphore: https://docs.python.org/3/library/asyncio-sync.html#asyncio.Semaphore
- Event: https://docs.python.org/3/library/asyncio-sync.html#asyncio.Event
- Threads: https://docs.python.org/3/library/asyncio-task.html#running-in-threads
Topics we didn’t cover
- Task Cancelation: https://docs.python.org/3/library/asyncio-task.html#task-cancellation
- Exception Group: https://docs.python.org/3/library/exceptions.html#ExceptionGroup
- Base Exception Group: https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup
- Future: https://docs.python.org/3/library/asyncio-task.html#asyncio.Future
- Condition: https://docs.python.org/3/library/asyncio-sync.html#condition
- Bounded Semaphore: https://docs.python.org/3/library/asyncio-sync.html#boundedsemaphore
- Barrier: https://docs.python.org/3/library/asyncio-sync.html#barrier