Python asyncio Asynchronous Programming Guide

What Is asyncio?

asyncio is Python’s asynchronous I/O framework. It enables efficient handling of tasks with long I/O wait times, such as network requests, file reads, and database queries. It can handle thousands of connections simultaneously without threads, making it widely used in web servers, crawlers, chatbots, and more.

This article covers the concept of event loops, async/await syntax, concurrent execution patterns, and error handling with practical examples.

Synchronous vs Asynchronous — Why asyncio?

In synchronous code, the program blocks until an I/O operation completes. In asynchronous code, other tasks can run while waiting for I/O.

import asyncio
import time

# Synchronous: runs 3 tasks sequentially (total 3 seconds)
def sync_tasks():
    for i in range(3):
        print(f"Task {i} started")
        time.sleep(1)  # Blocks for 1 second
        print(f"Task {i} completed")

# Asynchronous: runs 3 tasks concurrently (total 1 second)
async def async_task(task_id):
    print(f"Task {task_id} started")
    await asyncio.sleep(1)  # Waits 1 second (other tasks can run)
    print(f"Task {task_id} completed")

async def async_tasks():
    await asyncio.gather(
        async_task(0),
        async_task(1),
        async_task(2),
    )

# Synchronous: takes ~3 seconds
start = time.perf_counter()
sync_tasks()
print(f"Sync elapsed time: {time.perf_counter() - start:.2f}s")
# Sync elapsed time: 3.00s

# Asynchronous: takes ~1 second
start = time.perf_counter()
asyncio.run(async_tasks())
print(f"Async elapsed time: {time.perf_counter() - start:.2f}s")
# Async elapsed time: 1.00s
AspectSynchronous (sync)Asynchronous (async)
I/O waitingBlockingNon-blocking
Execution modelSequential executionEvent loop-based concurrent execution
Best suited forCPU-bound tasksI/O-bound tasks
ComplexityLowModerate

Core Concepts — Coroutines and the Event Loop

A function defined with async def is a coroutine. Calling a coroutine directly does not execute it — it returns a coroutine object. The event loop schedules and runs it.

import asyncio

async def fetch_user(user_id: int) -> dict:
    """Fetches user information asynchronously."""
    print(f"User {user_id} query started")
    await asyncio.sleep(0.5)  # Simulates a DB query
    print(f"User {user_id} query completed")
    return {"id": user_id, "name": f"User_{user_id}"}

async def main():
    # Calling a coroutine directly does not execute it
    coro = fetch_user(1)
    print(type(coro))  # <class 'coroutine'>

    # Use await to execute and get the result
    user = await coro
    print(user)  # {'id': 1, 'name': 'User_1'}

# asyncio.run() creates the event loop and runs the coroutine
asyncio.run(main())

asyncio.run() creates an event loop, runs the coroutine until completion, and then closes the loop. It is typically called only once at the program’s entry point.

Concurrent Execution — gather and TaskGroup

To run multiple coroutines concurrently, use asyncio.gather() or TaskGroup.

import asyncio

async def fetch_url(url: str) -> str:
    """Simulates fetching data from a URL"""
    delay = len(url) * 0.1  # Delay proportional to URL length
    await asyncio.sleep(delay)
    return f"{url}: {len(url)} bytes"

async def with_gather():
    """Concurrent execution with gather — returns results as a list"""
    urls = ["https://api.example.com/users",
            "https://api.example.com/posts",
            "https://api.example.com/comments"]

    results = await asyncio.gather(*[fetch_url(u) for u in urls])
    for result in results:
        print(result)
    # https://api.example.com/users: 30 bytes
    # https://api.example.com/posts: 30 bytes
    # https://api.example.com/comments: 33 bytes

async def with_taskgroup():
    """Concurrent execution with TaskGroup (Python 3.11+) — structured concurrency"""
    urls = ["https://api.example.com/users",
            "https://api.example.com/posts"]

    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_url(urls[0]))
        task2 = tg.create_task(fetch_url(urls[1]))

    # All tasks are guaranteed to be complete after exiting the TaskGroup block
    print(task1.result())  # https://api.example.com/users: 30 bytes
    print(task2.result())  # https://api.example.com/posts: 30 bytes

asyncio.run(with_gather())
asyncio.run(with_taskgroup())

TaskGroup is a structured concurrency pattern introduced in Python 3.11. If one task raises an exception, the remaining tasks are automatically cancelled, providing safer error handling than gather.

Timeouts and Cancellation

Set timeouts on async operations to prevent indefinite waiting.

import asyncio

async def slow_operation():
    """Simulates a long-running operation"""
    await asyncio.sleep(10)
    return "Done"

async def main():
    # asyncio.timeout (Python 3.11+)
    try:
        async with asyncio.timeout(2.0):
            result = await slow_operation()
    except TimeoutError:
        print("Timeout! Exceeded 2 seconds")

    # asyncio.wait_for (compatible with pre-3.11)
    try:
        result = await asyncio.wait_for(
            slow_operation(), timeout=2.0
        )
    except asyncio.TimeoutError:
        print("wait_for timeout!")

asyncio.run(main())
# Timeout! Exceeded 2 seconds
# wait_for timeout!

Semaphore — Limiting Concurrency

When calling external APIs, you often need to limit the number of concurrent requests. Use asyncio.Semaphore.

import asyncio

async def fetch_with_limit(sem: asyncio.Semaphore,
                           url: str) -> str:
    async with sem:  # Acquire semaphore (limits concurrent execution)
        print(f"Request started: {url}")
        await asyncio.sleep(1)  # Simulates an API call
        print(f"Request completed: {url}")
        return f"{url}: OK"

async def main():
    sem = asyncio.Semaphore(3)  # Maximum 3 concurrent
    urls = [f"https://api.example.com/item/{i}"
            for i in range(10)]

    results = await asyncio.gather(
        *[fetch_with_limit(sem, u) for u in urls]
    )
    print(f"Total {len(results)} completed")

asyncio.run(main())
# Runs 3 at a time, 10 total completed

Async Iterators and Generators

Use async for and async def ... yield to process asynchronous streams.

import asyncio

async def fetch_pages(total: int):
    """Async generator that fetches pages one by one"""
    for page in range(1, total + 1):
        await asyncio.sleep(0.3)  # Simulates page loading
        yield {"page": page, "items": [f"item_{i}" for i in range(5)]}

async def main():
    async for page_data in fetch_pages(3):
        print(f"Page {page_data['page']}: "
              f"{len(page_data['items'])} items")
    # Page 1: 5 items
    # Page 2: 5 items
    # Page 3: 5 items

asyncio.run(main())

Summary

asyncio is Python’s core asynchronous framework that dramatically improves performance for I/O-bound tasks.

  • Basic syntax: Define coroutines with async def, wait for execution with await
  • Concurrent execution: Run multiple coroutines simultaneously with asyncio.gather()
  • Structured concurrency: Safe task management with TaskGroup (3.11+)
  • Timeouts: Prevent indefinite waiting with asyncio.timeout() or wait_for()
  • Concurrency limits: Restrict concurrent execution count with Semaphore
  • Async iteration: Process streaming data with async for
  • Caveat: For CPU-bound tasks, use multiprocessing instead of asyncio

Was this article helpful?