Tutorial básico do Asyncio com Python (#dev #python #asyncio #concurrency #tutorial)
Overview
“Um dev resolveu um problema implementando uma solução assíncrona. Agora. mais. problema. tem. não.” - Provérbio antigo no X Twitter.
Neste post vamos passar pelos conceitos básicos do asyncio em Python, apenas o suficiente para você começar a usar.
O que é asyncio?
De acordo com a documentação oficial:
asyncio é uma biblioteca para escrever código concorrente usando a sintaxe async/await. asyncio é usado como base para múltiplos frameworks assíncronos em Python que fornecem servidores web e de rede de alta performance, bibliotecas de conexão a bancos de dados, filas de tarefas distribuídas, etc.
TL;DR: asyncio é um pacote que permite executar duas ou mais coisas ao mesmo tempo.
Conceitos Básicos
Event Loop
O event loop é o ponto central do asyncio. É um loop que executa tarefas e callbacks, e é responsável por agendar e rodar o código. Pense nele como o coordenador da execução do seu código assíncrono. Ele sabe todas as tarefas que estão pendentes e quando elas devem ser executadas.
Se uma tarefa está no loop (sendo executada), mas está esperando que algo aconteça (como uma solicitação de rede), o loop irá pausar a tarefa e executar outra. Quando o evento esperado acontecer, o loop retomará a tarefa.
Coroutines
Coroutines são funções que podem pausar e retomar sua execução. Elas são definidas com a sintaxe async def
.
Quando você chama uma coroutine, ela retorna um objeto do tipo coroutine, que você pode passar para o event loop para
ser executado.
import asyncio
async def main():
print("Hello, world from a coroutine!")
if __name__ == "__main__":
# Runs the coroutine
asyncio.run(main())
Quando você chama asyncio.run()
, ele cria um loop de eventos, executa a coroutine e fecha o loop quando ela termina,
e no nosso caso, ela terminará após mostrar “Olá, mundo de uma coroutine!”, já que não estamos fazendo nada em um loop
ou esperando por algo.
Uma coisa que vale a pena notar é que nesta linha asyncio.run(main())
, parece que estamos executando o código dentro
do método, mas não é bem o caso. Quando a função (coroutine) main é chamada, ela retorna um objeto do tipo Coroutine
.
Esse objeto será usado pelo loop principal para executar o código dentro da função.
De início, isso pode ser um pouco confuso, já que normalmente, quando você passa um callback, você não o executa imediatamente. Mas, neste caso, estamos gerando o objeto coroutine para que o loop de eventos saiba o que fazer.
Await
A palavra-chave await
é usada para pausar a execução de uma coroutine até que uma certa condição seja atendida.
Assim como você faria em qualquer outra linguagem de programação. A diferença é que, em Python, você só pode usar
await
dentro de uma coroutine.
import asyncio
async def main():
print("Hello, world from a coroutine!")
await asyncio.sleep(1)
print("Hello, world after 1 second!")
if __name__ == "__main__":
asyncio.run(main())
No exemplo acima, estamos usando await asyncio.sleep(1)
para pausar a execução da coroutine por 1 segundo. Depois
disso, a execução continuará, e a segunda instrução print será executada.
Você também pode aguardar outras coroutines:
import asyncio
async def fetch_products():
# Pretend we're fetching a lot of products from an API or database...
await asyncio.sleep(5)
print("Fetched ALL the products!")
return {"products": ["product1", "product2", "product3"]}
async def main():
print(f"started at {time.strftime('%X')}")
task = fetch_products()
products = await task
print(f"finished at {time.strftime('%X')}")
if __name__ == "__main__":
asyncio.run(main())
Neste exemplo, estamos chamando a coroutine fetch_products
e aguardando seu resultado. A coroutine fetch_products
pausará por 5 segundos, e então imprimirá “Buscamos TODOS os produtos!”. Depois disso, a coroutine main
continuará
sua execução.
Importante observar que na coroutine main
, estamos chamando fetch_products
sem a palavra-chave await
, o que cria uma tarefa.
Nesse ponto, o código dentro de fetch_products
não está sendo executado. Somente quando aguardamos a tarefa, o código
será executado. Se nunca aguardarmos a tarefa, o código dentro de fetch_products
nunca será executado.
Outra coisa importante é que quando aguardamos a tarefa, não precisamos executá-la novamente (como await task()
).
Internamente, aquele objeto coroutine será usado pelo loop de eventos para executar o código. Você está apenas
informando ao loop de eventos que irá esperar pelo resultado naquele ponto do seu programa.
Tasks
As tasks são usadas para agendar coroutines para serem executadas pelo loop de eventos. Com as tasks, você pode executar várias coroutines simultaneamente.
import random
import asyncio
async def fetch_products(page: int):
# Pretend we're fetching a lot of products from an API or database...
# The delay will be between 2 and 2 + page seconds, just to simulate a real-world scenario
delay = random.randint(2, 2 + page)
await asyncio.sleep(delay)
print(f"Fetched products from page {page} in {delay} seconds!")
return {"products": ["product1", "product2", "product3"]}
async def main():
print(f"started at {time.strftime('%X')}")
# Creating tasks...
task1 = asyncio.create_task(fetch_products(1))
task2 = asyncio.create_task(fetch_products(2))
task3 = asyncio.create_task(fetch_products(3))
task4 = asyncio.create_task(fetch_products(4))
# Waiting for all tasks to finish...
result1 = await task1
result2 = await task2
result3 = await task3
result4 = await task4
print(f"finished at {time.strftime('%X')}")
if __name__ == "__main__":
asyncio.run(main())
No exemplo acima, estamos criando 4 tasks diferentes, cada uma buscando produtos de uma página diferente, simulando que estamos buscando produtos de uma API paginada.
Você deve estar se perguntando: Pelo que eu disse na seção anterior, esse código vai executar cada task uma após a outra, certo?
Não exatamente. A grande diferença é que estamos usando asyncio.create_task
para criar as tasks. Então, não estamos
aguardando coroutines regulares, estamos criando tasks que serão executadas pelo loop de eventos o mais rápido possível.
Assim, neste caso, sempre que a task estiver dormindo, o loop de eventos executará outra task. Então, elas serão
executadas concorrentemente, e a coroutine principal só terminará quando todas as tasks estiverem concluídas.
Bem legal, né? Não precisávamos criar nenhum controle especial para conseguir isso.
Observação: Você também pode cancelar tasks, mas não acho que isso se enquadre na categoria “básico”. Vou deixar um link sobre esse tópico na documentação no final do post.
Gather
A função asyncio.gather
executa várias coroutines ao mesmo tempo e retorna seus resultados depois que todas elas
terminam.
import random
import asyncio
async def fetch_products(page: int):
# Pretend we're fetching a lot of products from an API or database...
# The delay will be between 2 and 2 + page seconds, just to simulate a real-world scenario
delay = random.randint(2, 2 + page)
await asyncio.sleep(delay)
print(f"Fetched products from page {page} in {delay} seconds!")
return {"products": ["product1", "product2", "product3"]}
async def main():
print(f"started at {time.strftime('%X')}")
# Creating tasks...
task1 = asyncio.create_task(fetch_products(1))
task2 = asyncio.create_task(fetch_products(2))
task3 = asyncio.create_task(fetch_products(3))
task4 = asyncio.create_task(fetch_products(4))
# Waiting for all tasks to finish...
result1 = await task1
result2 = await task2
result3 = await task3
result4 = await task4
print(f"finished at {time.strftime('%X')}")
if __name__ == "__main__":
asyncio.run(main())
Neste exemplo, não precisamos chamar asyncio.create_task
para criar as tasks, mas estamos “dizendo” ao asyncio que ele
deve coletar os resultados de todas as coroutines passadas para asyncio.gather
e retorná-los em uma lista.
A ordem dos resultados será a mesma ordem das coroutines passadas para asyncio.gather
, mas isso não significa que as
coroutines terminarão na mesma ordem. Apenas a posição do result
na results list
é garantida ser a mesma posição
da coroutine na chamada de asyncio.gather
.
Uma observação importante é que se uma coroutine gerar um erro, asyncio.gather
não interromperá a execução de todas
as outras coroutines. Ele apenas levantará o erro e continuará a execução das outras coroutines.
Task Groups
Os task groups são uma alternativa ao asyncio.gather
que permite executar múltiplas coroutines simultaneamente e
coletar seus resultados. De acordo com a documentação, isso fornece uma garantia de segurança melhor do que
asyncio.gather
.
A principal diferença entre asyncio.TaskGroup
e asyncio.gather
é que TaskGroup levantará uma exceção se qualquer
uma das coroutines levantar uma exceção. Isso pode ser útil se você quiser interromper a execução de todas as
coroutines se uma delas falhar.
import random
import asyncio
async def fetch_products(page: int):
# Pretend we're fetching a lot of products from an API or database...
# The delay will be between 2 and 2 + page seconds, just to simulate a real-world scenario
delay = random.randint(2, 2 + page)
await asyncio.sleep(delay)
print(f"Fetched products from page {page} in {delay} seconds!")
return {"products": ["product1", "product2", "product3"]}
async def main():
print(f"started at {time.strftime('%X')}")
# Running multiple coroutines
tasks = []
product_pages_to_fetch = 5
async with asyncio.TaskGroup() as tg:
for i in range(product_pages_to_fetch):
tasks.append(tg.create_task(fetch_products(i+1)))
results = [task.result() for task in tasks]
print(f"finished at {time.strftime('%X')}")
if __name__ == "__main__":
asyncio.run(main())
Esta é uma abordagem ligeiramente diferente do exemplo anterior. Primeiro criamos um contexto de grupo de tasks
(async with asyncio.TaskGroup() as tg:
), depois criamos as tasks dentro do contexto chamando tg.create_task
.
Atenção: o método create_task
é chamado a partir de tg
(o grupo de tasks que criamos), e não de asyncio
!
Depois disso, não precisamos aguardar explicitamente por nada. O contexto que criamos (com as palavras-chave
async with
) fará isso por nós. Isso significa que quando chegarmos ao próximo comando
(results = [task.result() for task in tasks]
), todas as tasks terão terminado e poderemos coletar seus resultados.
Se uma ou mais tasks falharem, o contexto combinará as exceções em um ExceptionGroup (ou BaseExceptionGroup) e levantará
a exceção. As exceções para isso são erros de KeyboardInterrupt
e SystemExit
, que (se ocorrerem) serão levantadas em
vez do ExceptionGroup. (Novamente, não entrarei em detalhes sobre isso, mas deixarei um link na seção de referências.)
Syncronization: Locks, Semaphores, and Events
Até agora, falamos sobre tasks/coroutines que executam algum código “independente”, ou seja, que não dependem umas das outras, nem dependem de alguns recursos compartilhados. Mas e se você for realmente sortudo e, em seu projeto, as tasks precisarem acessar alguns dados comuns/compartilhados?
Lock
Um lock é uma ferramenta de sincronização que permite que apenas uma task acesse um recurso compartilhado por vez. Se uma task tentar acessar um recurso que já está sendo usado por outra task, ela esperará até que o recurso seja liberado.
import asyncio
# This will be shared between the tasks
products_fetched = 0
# this will control the access to the shared resource
lock = asyncio.Lock()
async def fetch_products():
global products_fetched
async with lock:
print("Fetching products...")
await asyncio.sleep(1)
products_fetched += random.randint(1, 100)
print(f"Products fetched: {products_fetched}")
async def main():
print(f"started at {time.strftime('%X')}")
# Running multiple coroutines
tasks = []
concurrent_tasks = 5
async with asyncio.TaskGroup() as tg:
for i in range(concurrent_tasks):
tasks.append(tg.create_task(fetch_products()))
print(f"finished at {time.strftime('%X')}")
Neste exemplo, o código dentro da coroutine principal é bem trivial (neste ponto). O que nos interessa é o código dentro
da coroutine fetch_products
. Nela, estamos criando um contexto assíncrono usando async with lock:
, e ao fazer isso,
estamos dizendo ao asyncio que apenas uma task pode acessar o código dentro do contexto por vez.
Tá, e daí? Bom, já que apenas uma task por vez pode executar o código dentro do contexto de lock, isso significa que
nunca teremos duas tasks tentando acessar nosso recurso compartilhado (products_fetched
) ao mesmo tempo.
Vale lembrar que você deve ficar atento à quantidade de código que você coloca dentro desses contextos, porque se você tiver muita lógica que precisa ser executada dentro do lock, pode acabar criando um gargalo.
Este lock
funciona de maneira semelhante à palavra-chave lock
em C#. Se você está familiarizado com ela, só seguir a
mesma lógica.
Semaphore
Similar a um lock
, o semaphore
é uma ferramenta de sincronização que permite a um número fixo de tasks acessar um
recurso compartilhado ao mesmo tempo.
O uso é bem parecido:
import asyncio
async def fetch_products(semaphore):
async with semaphore:
print("Fetching products...")
await asyncio.sleep(1)
print("Products fetched!")
async def main():
print(f"started at {time.strftime('%X')}")
# Running multiple coroutines
tasks = []
concurrent_tasks = 5
tasks_concurrently_doing_something = 2
semaphore = asyncio.Semaphore(tasks_concurrently_doing_something)
async with asyncio.TaskGroup() as tg:
for i in range(concurrent_tasks):
tasks.append(tg.create_task(fetch_products(semaphore)))
print(f"finished at {time.strftime('%X')}")
Neste exemplo, estamos criando um semaphore
e definindo um limite de 2 tasks que podem estar executando ao mesmo
tempo. Isso seria útil para coisas como limitar o número de requisições a uma API ou o número de conexões simultâneas a
um banco de dados.
Por exemplo, se você quiser fazer 10.000 requisições a uma API, mas nunca mais de 100 ao mesmo tempo, você poderia
definir a variável tasks_concurrently_doing_something
para 100.
Event
Um Event
é uma ferramenta de sincronização que permite a uma task sinalizar para outras tasks que algo aconteceu.
Isso é bastante útil em casos onde você tem tasks concorrentes, mas parte da task-A depende da conclusão da task-B.
Exemplo: Você tem uma task que atualiza as promoções de produtos no seu banco de dados de acordo com algumas regras de negócios, e outra task que enviará emails para os clientes sobre as novas promoções. Nesse caso, você quer garantir que os emails só serão enviados após as promoções serem atualizadas.
import asyncio
async def update_promotions(event):
print("Updating promotions...")
await asyncio.sleep(2)
print("Promotions updated!")
event.set()
async def send_emails(event):
print("Fetching users from the database...")
print("Deciding which users will receive the email...")
print("Waiting for promotions to be updated...")
await event.wait()
print("Fetching promotions...")
await asyncio.sleep(1)
print("Sending emails...")
await asyncio.sleep(1)
async def main():
print(f"started at {time.strftime('%X')}")
# Running multiple coroutines
tasks = []
event = asyncio.Event()
async with asyncio.TaskGroup() as tg:
tasks.append(tg.create_task(update_promotions(event)))
tasks.append(tg.create_task(send_emails(event)))
print(f"finished at {time.strftime('%X')}")
Essa é uma maneira bem simples de resolver algumas dependências entre tasks. Assim, de preferência, melhor evitar esse tipo de dependência, mas às vezes não tem jeito.
Running in Threads
Até agora todos os exemplos que vimos estão rodando na mesma thread. Concorrentemente, mas na mesma thread. Mas e se você tiver uma task que está bloqueando o event loop?
Para resolver isso, podemos dizer ao asyncio para executar uma task específica em uma thread separada:
import asyncio
import time
def blocking_task():
print("Blocking task started...")
time.sleep(5)
print("Blocking task finished!")
async def non_blocking_task():
print("Non-blocking task started...")
await asyncio.sleep(1)
print("Non-blocking task finished!")
async def main():
print(f"started at {time.strftime('%X')}")
# Running multiple coroutines
tasks = []
async with asyncio.TaskGroup() as tg:
threaded_task = asyncio.to_thread(blocking_task)
tasks.append(tg.create_task(threaded_task))
tasks.append(tg.create_task(non_blocking_task()))
print(f"finished at {time.strftime('%X')}")
if __name__ == "__main__":
asyncio.run(main())
Neste exemplo, a função blocking_task() vai, como o nome sugere, bloquear o event loop por 5 segundos. Para evitar isso,
estamos usando asyncio.to_thread
para rodar essa task em uma thread separada. Desta forma, o event loop não vai parar
tudo até a execução da tarefa que bloqueia a thread principal terminar.
Essa operação bloqueante poderia ser qualquer coisa; neste caso, estamos apenas usando o time.sleep(5)
para simplificar.
Atenção ao detalhe: a função blocking_task
não é uma coroutine (não há a palavra-chave async
na definição), mas
ainda podemos rodá-la em uma thread separada usando asyncio.to_thread
.
Se você quiser que a função blocking_task
seja uma coroutine, podemos fazer algumas mudanças. Veja este novo exemplo:
import asyncio
import time
async def blocking_task():
print("Blocking task started...")
await asyncio.to_thread(time.sleep, 5)
print("Blocking task finished!")
async def non_blocking_task():
print("Non-blocking task started...")
await asyncio.sleep(1)
print("Non-blocking task finished!")
async def main():
print(f"started at {time.strftime('%X')}")
# Running multiple coroutines
tasks = []
async with asyncio.TaskGroup() as tg:
tasks.append(tg.create_task(blocking_task()))
tasks.append(tg.create_task(non_blocking_task()))
print(f"finished at {time.strftime('%X')}")
if __name__ == "__main__":
asyncio.run(main())
A principal diferença neste novo código é que blocking_task
agora é uma coroutine, e dentro dela estamos usando
await asyncio.to_thread
apenas na chamada time.sleep(5)
. Desta forma, estamos dizendo ao asyncio para executar
apenas esta parte do código em uma thread separada.
Conclusão
Beleza, este post já está bem grande, mas acho que cobre o básico do que este pacote pode fazer, e isso é apenas a
ponta do iceberg quando se trata de asyncio
. Se você quiser se aprofundar no que este pacote pode fazer,
recomendo que confira a documentação.
Espero ter ajudado! :)
Referências
- Asyncio documentation: https://docs.python.org/3/library/asyncio.html
- Event Loop: https://docs.python.org/3/library/asyncio-eventloop.html
- Coroutines: https://docs.python.org/3/library/asyncio-task.html#coroutines
- Awaitables: https://docs.python.org/3/library/asyncio-task.html#awaitables
- Await: https://docs.python.org/3/reference/expressions.html#await
- Task: https://docs.python.org/3/library/asyncio-task.html#asyncio.Task
- Create Task: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
- Gather: https://docs.python.org/3/library/asyncio-task.html#asyncio.gather
- Task Groups: https://docs.python.org/3/library/asyncio-task.html#task-groups
- Synchronization Primitives: https://docs.python.org/3/library/asyncio-sync.html#synchronization-primitives
- Lock: https://docs.python.org/3/library/asyncio-sync.html#asyncio.Lock
- Semaphore: https://docs.python.org/3/library/asyncio-sync.html#asyncio.Semaphore
- Event: https://docs.python.org/3/library/asyncio-sync.html#asyncio.Event
- Threads: https://docs.python.org/3/library/asyncio-task.html#running-in-threads
Tópicos que não foram cobertos neste post
- Task Cancelation: https://docs.python.org/3/library/asyncio-task.html#task-cancellation
- Exception Group: https://docs.python.org/3/library/exceptions.html#ExceptionGroup
- Base Exception Group: https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup
- Future: https://docs.python.org/3/library/asyncio-task.html#asyncio.Future
- Condition: https://docs.python.org/3/library/asyncio-sync.html#condition
- Bounded Semaphore: https://docs.python.org/3/library/asyncio-sync.html#boundedsemaphore
- Barrier: https://docs.python.org/3/library/asyncio-sync.html#barrier