Tutorial básico do Asyncio com Python (#dev #python #asyncio #concurrency #tutorial)

Tutorial básico do Asyncio com Python

Overview

“Um dev resolveu um problema implementando uma solução assíncrona. Agora. mais. problema. tem. não.” - Provérbio antigo no X Twitter.

Neste post vamos passar pelos conceitos básicos do asyncio em Python, apenas o suficiente para você começar a usar.

O que é asyncio?

De acordo com a documentação oficial:

asyncio é uma biblioteca para escrever código concorrente usando a sintaxe async/await. asyncio é usado como base para múltiplos frameworks assíncronos em Python que fornecem servidores web e de rede de alta performance, bibliotecas de conexão a bancos de dados, filas de tarefas distribuídas, etc.

TL;DR: asyncio é um pacote que permite executar duas ou mais coisas ao mesmo tempo.

Conceitos Básicos

Event Loop

O event loop é o ponto central do asyncio. É um loop que executa tarefas e callbacks, e é responsável por agendar e rodar o código. Pense nele como o coordenador da execução do seu código assíncrono. Ele sabe todas as tarefas que estão pendentes e quando elas devem ser executadas.

Se uma tarefa está no loop (sendo executada), mas está esperando que algo aconteça (como uma solicitação de rede), o loop irá pausar a tarefa e executar outra. Quando o evento esperado acontecer, o loop retomará a tarefa.

Coroutines

Coroutines são funções que podem pausar e retomar sua execução. Elas são definidas com a sintaxe async def. Quando você chama uma coroutine, ela retorna um objeto do tipo coroutine, que você pode passar para o event loop para ser executado.

import asyncio

async def main():
    print("Hello, world from a coroutine!")
    
if __name__ == "__main__":
    # Runs the coroutine
    asyncio.run(main())

Quando você chama asyncio.run(), ele cria um loop de eventos, executa a coroutine e fecha o loop quando ela termina, e no nosso caso, ela terminará após mostrar “Olá, mundo de uma coroutine!”, já que não estamos fazendo nada em um loop ou esperando por algo.

Uma coisa que vale a pena notar é que nesta linha asyncio.run(main()), parece que estamos executando o código dentro do método, mas não é bem o caso. Quando a função (coroutine) main é chamada, ela retorna um objeto do tipo Coroutine. Esse objeto será usado pelo loop principal para executar o código dentro da função.

De início, isso pode ser um pouco confuso, já que normalmente, quando você passa um callback, você não o executa imediatamente. Mas, neste caso, estamos gerando o objeto coroutine para que o loop de eventos saiba o que fazer.

Await

A palavra-chave await é usada para pausar a execução de uma coroutine até que uma certa condição seja atendida. Assim como você faria em qualquer outra linguagem de programação. A diferença é que, em Python, você só pode usar await dentro de uma coroutine.

import asyncio

async def main():
    print("Hello, world from a coroutine!")
    await asyncio.sleep(1)
    print("Hello, world after 1 second!")
    
if __name__ == "__main__":
    asyncio.run(main())

No exemplo acima, estamos usando await asyncio.sleep(1) para pausar a execução da coroutine por 1 segundo. Depois disso, a execução continuará, e a segunda instrução print será executada.

Você também pode aguardar outras coroutines:

import asyncio

async def fetch_products():
    # Pretend we're fetching a lot of products from an API or database...
    await asyncio.sleep(5)
    print("Fetched ALL the products!")
    return {"products": ["product1", "product2", "product3"]}
    
async def main():
    print(f"started at {time.strftime('%X')}")
    
    task = fetch_products()
    
    products = await task
    
    print(f"finished at {time.strftime('%X')}")

if __name__ == "__main__":
    asyncio.run(main())

Neste exemplo, estamos chamando a coroutine fetch_products e aguardando seu resultado. A coroutine fetch_products pausará por 5 segundos, e então imprimirá “Buscamos TODOS os produtos!”. Depois disso, a coroutine main continuará sua execução.

Importante observar que na coroutine main, estamos chamando fetch_products sem a palavra-chave await, o que cria uma tarefa. Nesse ponto, o código dentro de fetch_products não está sendo executado. Somente quando aguardamos a tarefa, o código será executado. Se nunca aguardarmos a tarefa, o código dentro de fetch_products nunca será executado.

Outra coisa importante é que quando aguardamos a tarefa, não precisamos executá-la novamente (como await task()). Internamente, aquele objeto coroutine será usado pelo loop de eventos para executar o código. Você está apenas informando ao loop de eventos que irá esperar pelo resultado naquele ponto do seu programa.

Tasks

As tasks são usadas para agendar coroutines para serem executadas pelo loop de eventos. Com as tasks, você pode executar várias coroutines simultaneamente.

import random
import asyncio

async def fetch_products(page: int):
    # Pretend we're fetching a lot of products from an API or database...
    # The delay will be between 2 and 2 + page seconds, just to simulate a real-world scenario
    delay = random.randint(2, 2 + page)
    
    await asyncio.sleep(delay)
    print(f"Fetched products from page {page} in {delay} seconds!")

    return {"products": ["product1", "product2", "product3"]}
    
async def main():
    print(f"started at {time.strftime('%X')}")
    # Creating tasks...
    task1 = asyncio.create_task(fetch_products(1))
    task2 = asyncio.create_task(fetch_products(2))
    task3 = asyncio.create_task(fetch_products(3))
    task4 = asyncio.create_task(fetch_products(4))
    
    # Waiting for all tasks to finish...
    result1 = await task1
    result2 = await task2
    result3 = await task3
    result4 = await task4

    print(f"finished at {time.strftime('%X')}")

if __name__ == "__main__":
    asyncio.run(main())

No exemplo acima, estamos criando 4 tasks diferentes, cada uma buscando produtos de uma página diferente, simulando que estamos buscando produtos de uma API paginada.

Você deve estar se perguntando: Pelo que eu disse na seção anterior, esse código vai executar cada task uma após a outra, certo?

Não exatamente. A grande diferença é que estamos usando asyncio.create_task para criar as tasks. Então, não estamos aguardando coroutines regulares, estamos criando tasks que serão executadas pelo loop de eventos o mais rápido possível. Assim, neste caso, sempre que a task estiver dormindo, o loop de eventos executará outra task. Então, elas serão executadas concorrentemente, e a coroutine principal só terminará quando todas as tasks estiverem concluídas.

Bem legal, né? Não precisávamos criar nenhum controle especial para conseguir isso.

Observação: Você também pode cancelar tasks, mas não acho que isso se enquadre na categoria “básico”. Vou deixar um link sobre esse tópico na documentação no final do post.

Gather

A função asyncio.gather executa várias coroutines ao mesmo tempo e retorna seus resultados depois que todas elas terminam.

import random
import asyncio

async def fetch_products(page: int):
    # Pretend we're fetching a lot of products from an API or database...
    # The delay will be between 2 and 2 + page seconds, just to simulate a real-world scenario
    delay = random.randint(2, 2 + page)
    
    await asyncio.sleep(delay)
    print(f"Fetched products from page {page} in {delay} seconds!")

    return {"products": ["product1", "product2", "product3"]}
    
async def main():
    print(f"started at {time.strftime('%X')}")
    # Creating tasks...
    task1 = asyncio.create_task(fetch_products(1))
    task2 = asyncio.create_task(fetch_products(2))
    task3 = asyncio.create_task(fetch_products(3))
    task4 = asyncio.create_task(fetch_products(4))
    
    # Waiting for all tasks to finish...
    result1 = await task1
    result2 = await task2
    result3 = await task3
    result4 = await task4

    print(f"finished at {time.strftime('%X')}")

if __name__ == "__main__":
    asyncio.run(main())

Neste exemplo, não precisamos chamar asyncio.create_task para criar as tasks, mas estamos “dizendo” ao asyncio que ele deve coletar os resultados de todas as coroutines passadas para asyncio.gather e retorná-los em uma lista.

A ordem dos resultados será a mesma ordem das coroutines passadas para asyncio.gather, mas isso não significa que as coroutines terminarão na mesma ordem. Apenas a posição do result na results list é garantida ser a mesma posição da coroutine na chamada de asyncio.gather.

Uma observação importante é que se uma coroutine gerar um erro, asyncio.gather não interromperá a execução de todas as outras coroutines. Ele apenas levantará o erro e continuará a execução das outras coroutines.

Task Groups

Os task groups são uma alternativa ao asyncio.gather que permite executar múltiplas coroutines simultaneamente e coletar seus resultados. De acordo com a documentação, isso fornece uma garantia de segurança melhor do que asyncio.gather.

A principal diferença entre asyncio.TaskGroup e asyncio.gather é que TaskGroup levantará uma exceção se qualquer uma das coroutines levantar uma exceção. Isso pode ser útil se você quiser interromper a execução de todas as coroutines se uma delas falhar.

import random
import asyncio

async def fetch_products(page: int):
    # Pretend we're fetching a lot of products from an API or database...
    # The delay will be between 2 and 2 + page seconds, just to simulate a real-world scenario
    delay = random.randint(2, 2 + page)
    
    await asyncio.sleep(delay)
    print(f"Fetched products from page {page} in {delay} seconds!")

    return {"products": ["product1", "product2", "product3"]}
    
async def main():
    print(f"started at {time.strftime('%X')}")
    # Running multiple coroutines
    tasks = []
    product_pages_to_fetch = 5

    async with asyncio.TaskGroup() as tg:
        for i in range(product_pages_to_fetch):
            tasks.append(tg.create_task(fetch_products(i+1)))

    results = [task.result() for task in tasks]

    print(f"finished at {time.strftime('%X')}")

if __name__ == "__main__":
    asyncio.run(main())

Esta é uma abordagem ligeiramente diferente do exemplo anterior. Primeiro criamos um contexto de grupo de tasks (async with asyncio.TaskGroup() as tg:), depois criamos as tasks dentro do contexto chamando tg.create_task. Atenção: o método create_task é chamado a partir de tg (o grupo de tasks que criamos), e não de asyncio!

Depois disso, não precisamos aguardar explicitamente por nada. O contexto que criamos (com as palavras-chave async with) fará isso por nós. Isso significa que quando chegarmos ao próximo comando (results = [task.result() for task in tasks]), todas as tasks terão terminado e poderemos coletar seus resultados.

Se uma ou mais tasks falharem, o contexto combinará as exceções em um ExceptionGroup (ou BaseExceptionGroup) e levantará a exceção. As exceções para isso são erros de KeyboardInterrupt e SystemExit, que (se ocorrerem) serão levantadas em vez do ExceptionGroup. (Novamente, não entrarei em detalhes sobre isso, mas deixarei um link na seção de referências.)

Syncronization: Locks, Semaphores, and Events

Até agora, falamos sobre tasks/coroutines que executam algum código “independente”, ou seja, que não dependem umas das outras, nem dependem de alguns recursos compartilhados. Mas e se você for realmente sortudo e, em seu projeto, as tasks precisarem acessar alguns dados comuns/compartilhados?

Lock

Um lock é uma ferramenta de sincronização que permite que apenas uma task acesse um recurso compartilhado por vez. Se uma task tentar acessar um recurso que já está sendo usado por outra task, ela esperará até que o recurso seja liberado.

import asyncio

# This will be shared between the tasks
products_fetched = 0

# this will control the access to the shared resource
lock = asyncio.Lock()

async def fetch_products():
    global products_fetched
    async with lock:
        print("Fetching products...")
        await asyncio.sleep(1)
        products_fetched += random.randint(1, 100)
        print(f"Products fetched: {products_fetched}")


async def main():
    print(f"started at {time.strftime('%X')}")
    # Running multiple coroutines
    tasks = []
    concurrent_tasks = 5

    async with asyncio.TaskGroup() as tg:
        for i in range(concurrent_tasks):
            tasks.append(tg.create_task(fetch_products()))

    print(f"finished at {time.strftime('%X')}")

Neste exemplo, o código dentro da coroutine principal é bem trivial (neste ponto). O que nos interessa é o código dentro da coroutine fetch_products. Nela, estamos criando um contexto assíncrono usando async with lock:, e ao fazer isso, estamos dizendo ao asyncio que apenas uma task pode acessar o código dentro do contexto por vez.

Tá, e daí? Bom, já que apenas uma task por vez pode executar o código dentro do contexto de lock, isso significa que nunca teremos duas tasks tentando acessar nosso recurso compartilhado (products_fetched) ao mesmo tempo.

Vale lembrar que você deve ficar atento à quantidade de código que você coloca dentro desses contextos, porque se você tiver muita lógica que precisa ser executada dentro do lock, pode acabar criando um gargalo.

Este lock funciona de maneira semelhante à palavra-chave lock em C#. Se você está familiarizado com ela, só seguir a mesma lógica.

Semaphore

Similar a um lock, o semaphore é uma ferramenta de sincronização que permite a um número fixo de tasks acessar um recurso compartilhado ao mesmo tempo.

O uso é bem parecido:

import asyncio

async def fetch_products(semaphore):
    async with semaphore:
        print("Fetching products...")
        await asyncio.sleep(1)
        print("Products fetched!")

async def main():
    print(f"started at {time.strftime('%X')}")
    # Running multiple coroutines
    tasks = []
    concurrent_tasks = 5
    tasks_concurrently_doing_something = 2

    semaphore = asyncio.Semaphore(tasks_concurrently_doing_something)

    async with asyncio.TaskGroup() as tg:
        for i in range(concurrent_tasks):
            tasks.append(tg.create_task(fetch_products(semaphore)))

    print(f"finished at {time.strftime('%X')}")

Neste exemplo, estamos criando um semaphore e definindo um limite de 2 tasks que podem estar executando ao mesmo tempo. Isso seria útil para coisas como limitar o número de requisições a uma API ou o número de conexões simultâneas a um banco de dados.

Por exemplo, se você quiser fazer 10.000 requisições a uma API, mas nunca mais de 100 ao mesmo tempo, você poderia definir a variável tasks_concurrently_doing_something para 100.

Event

Um Event é uma ferramenta de sincronização que permite a uma task sinalizar para outras tasks que algo aconteceu. Isso é bastante útil em casos onde você tem tasks concorrentes, mas parte da task-A depende da conclusão da task-B.

Exemplo: Você tem uma task que atualiza as promoções de produtos no seu banco de dados de acordo com algumas regras de negócios, e outra task que enviará emails para os clientes sobre as novas promoções. Nesse caso, você quer garantir que os emails só serão enviados após as promoções serem atualizadas.

import asyncio

async def update_promotions(event):
    print("Updating promotions...")
    await asyncio.sleep(2)
    print("Promotions updated!")
    event.set()
    
async def send_emails(event):
    print("Fetching users from the database...")
    print("Deciding which users will receive the email...")
    print("Waiting for promotions to be updated...")
    await event.wait()
    print("Fetching promotions...")
    await asyncio.sleep(1)
    print("Sending emails...")
    await asyncio.sleep(1)

async def main():
    print(f"started at {time.strftime('%X')}")
    # Running multiple coroutines
    tasks = []
    event = asyncio.Event()

    async with asyncio.TaskGroup() as tg:
        tasks.append(tg.create_task(update_promotions(event)))
        tasks.append(tg.create_task(send_emails(event)))

    print(f"finished at {time.strftime('%X')}")

Essa é uma maneira bem simples de resolver algumas dependências entre tasks. Assim, de preferência, melhor evitar esse tipo de dependência, mas às vezes não tem jeito.

Running in Threads

Até agora todos os exemplos que vimos estão rodando na mesma thread. Concorrentemente, mas na mesma thread. Mas e se você tiver uma task que está bloqueando o event loop?

Para resolver isso, podemos dizer ao asyncio para executar uma task específica em uma thread separada:

import asyncio
import time

def blocking_task():
    print("Blocking task started...")
    time.sleep(5)
    print("Blocking task finished!")

async def non_blocking_task():
    print("Non-blocking task started...")
    await asyncio.sleep(1)
    print("Non-blocking task finished!")

async def main():
    print(f"started at {time.strftime('%X')}")
    # Running multiple coroutines
    tasks = []

    async with asyncio.TaskGroup() as tg:
        threaded_task = asyncio.to_thread(blocking_task)
        tasks.append(tg.create_task(threaded_task))
        tasks.append(tg.create_task(non_blocking_task()))

    print(f"finished at {time.strftime('%X')}")

if __name__ == "__main__":
    asyncio.run(main())

Neste exemplo, a função blocking_task() vai, como o nome sugere, bloquear o event loop por 5 segundos. Para evitar isso, estamos usando asyncio.to_thread para rodar essa task em uma thread separada. Desta forma, o event loop não vai parar tudo até a execução da tarefa que bloqueia a thread principal terminar.

Essa operação bloqueante poderia ser qualquer coisa; neste caso, estamos apenas usando o time.sleep(5) para simplificar.

Atenção ao detalhe: a função blocking_task não é uma coroutine (não há a palavra-chave async na definição), mas ainda podemos rodá-la em uma thread separada usando asyncio.to_thread.

Se você quiser que a função blocking_task seja uma coroutine, podemos fazer algumas mudanças. Veja este novo exemplo:

import asyncio
import time


async def blocking_task():
    print("Blocking task started...")
    await asyncio.to_thread(time.sleep, 5)
    print("Blocking task finished!")


async def non_blocking_task():
    print("Non-blocking task started...")
    await asyncio.sleep(1)
    print("Non-blocking task finished!")


async def main():
    print(f"started at {time.strftime('%X')}")
    # Running multiple coroutines
    tasks = []

    async with asyncio.TaskGroup() as tg:
        tasks.append(tg.create_task(blocking_task()))
        tasks.append(tg.create_task(non_blocking_task()))

    print(f"finished at {time.strftime('%X')}")


if __name__ == "__main__":
    asyncio.run(main())

A principal diferença neste novo código é que blocking_task agora é uma coroutine, e dentro dela estamos usando await asyncio.to_thread apenas na chamada time.sleep(5). Desta forma, estamos dizendo ao asyncio para executar apenas esta parte do código em uma thread separada.

Conclusão

Beleza, este post já está bem grande, mas acho que cobre o básico do que este pacote pode fazer, e isso é apenas a ponta do iceberg quando se trata de asyncio. Se você quiser se aprofundar no que este pacote pode fazer, recomendo que confira a documentação.

Espero ter ajudado! :)

Referências

Tópicos que não foram cobertos neste post

Traduções: