What Is asyncio?
asyncio is Python’s asynchronous I/O framework. It enables efficient handling of tasks with long I/O wait times, such as network requests, file reads, and database queries. It can handle thousands of connections simultaneously without threads, making it widely used in web servers, crawlers, chatbots, and more.
This article covers the concept of event loops, async/await syntax, concurrent execution patterns, and error handling with practical examples.
Synchronous vs Asynchronous — Why asyncio?
In synchronous code, the program blocks until an I/O operation completes. In asynchronous code, other tasks can run while waiting for I/O.
import asyncio
import time
# Synchronous: runs 3 tasks sequentially (total 3 seconds)
def sync_tasks():
for i in range(3):
print(f"Task {i} started")
time.sleep(1) # Blocks for 1 second
print(f"Task {i} completed")
# Asynchronous: runs 3 tasks concurrently (total 1 second)
async def async_task(task_id):
print(f"Task {task_id} started")
await asyncio.sleep(1) # Waits 1 second (other tasks can run)
print(f"Task {task_id} completed")
async def async_tasks():
await asyncio.gather(
async_task(0),
async_task(1),
async_task(2),
)
# Synchronous: takes ~3 seconds
start = time.perf_counter()
sync_tasks()
print(f"Sync elapsed time: {time.perf_counter() - start:.2f}s")
# Sync elapsed time: 3.00s
# Asynchronous: takes ~1 second
start = time.perf_counter()
asyncio.run(async_tasks())
print(f"Async elapsed time: {time.perf_counter() - start:.2f}s")
# Async elapsed time: 1.00s
| Aspect | Synchronous (sync) | Asynchronous (async) |
|---|---|---|
| I/O waiting | Blocking | Non-blocking |
| Execution model | Sequential execution | Event loop-based concurrent execution |
| Best suited for | CPU-bound tasks | I/O-bound tasks |
| Complexity | Low | Moderate |
Core Concepts — Coroutines and the Event Loop
A function defined with async def is a coroutine. Calling a coroutine directly does not execute it — it returns a coroutine object. The event loop schedules and runs it.
import asyncio
async def fetch_user(user_id: int) -> dict:
"""Fetches user information asynchronously."""
print(f"User {user_id} query started")
await asyncio.sleep(0.5) # Simulates a DB query
print(f"User {user_id} query completed")
return {"id": user_id, "name": f"User_{user_id}"}
async def main():
# Calling a coroutine directly does not execute it
coro = fetch_user(1)
print(type(coro)) # <class 'coroutine'>
# Use await to execute and get the result
user = await coro
print(user) # {'id': 1, 'name': 'User_1'}
# asyncio.run() creates the event loop and runs the coroutine
asyncio.run(main())
asyncio.run() creates an event loop, runs the coroutine until completion, and then closes the loop. It is typically called only once at the program’s entry point.
Concurrent Execution — gather and TaskGroup
To run multiple coroutines concurrently, use asyncio.gather() or TaskGroup.
import asyncio
async def fetch_url(url: str) -> str:
"""Simulates fetching data from a URL"""
delay = len(url) * 0.1 # Delay proportional to URL length
await asyncio.sleep(delay)
return f"{url}: {len(url)} bytes"
async def with_gather():
"""Concurrent execution with gather — returns results as a list"""
urls = ["https://api.example.com/users",
"https://api.example.com/posts",
"https://api.example.com/comments"]
results = await asyncio.gather(*[fetch_url(u) for u in urls])
for result in results:
print(result)
# https://api.example.com/users: 30 bytes
# https://api.example.com/posts: 30 bytes
# https://api.example.com/comments: 33 bytes
async def with_taskgroup():
"""Concurrent execution with TaskGroup (Python 3.11+) — structured concurrency"""
urls = ["https://api.example.com/users",
"https://api.example.com/posts"]
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_url(urls[0]))
task2 = tg.create_task(fetch_url(urls[1]))
# All tasks are guaranteed to be complete after exiting the TaskGroup block
print(task1.result()) # https://api.example.com/users: 30 bytes
print(task2.result()) # https://api.example.com/posts: 30 bytes
asyncio.run(with_gather())
asyncio.run(with_taskgroup())
TaskGroup is a structured concurrency pattern introduced in Python 3.11. If one task raises an exception, the remaining tasks are automatically cancelled, providing safer error handling than gather.
Timeouts and Cancellation
Set timeouts on async operations to prevent indefinite waiting.
import asyncio
async def slow_operation():
"""Simulates a long-running operation"""
await asyncio.sleep(10)
return "Done"
async def main():
# asyncio.timeout (Python 3.11+)
try:
async with asyncio.timeout(2.0):
result = await slow_operation()
except TimeoutError:
print("Timeout! Exceeded 2 seconds")
# asyncio.wait_for (compatible with pre-3.11)
try:
result = await asyncio.wait_for(
slow_operation(), timeout=2.0
)
except asyncio.TimeoutError:
print("wait_for timeout!")
asyncio.run(main())
# Timeout! Exceeded 2 seconds
# wait_for timeout!
Semaphore — Limiting Concurrency
When calling external APIs, you often need to limit the number of concurrent requests. Use asyncio.Semaphore.
import asyncio
async def fetch_with_limit(sem: asyncio.Semaphore,
url: str) -> str:
async with sem: # Acquire semaphore (limits concurrent execution)
print(f"Request started: {url}")
await asyncio.sleep(1) # Simulates an API call
print(f"Request completed: {url}")
return f"{url}: OK"
async def main():
sem = asyncio.Semaphore(3) # Maximum 3 concurrent
urls = [f"https://api.example.com/item/{i}"
for i in range(10)]
results = await asyncio.gather(
*[fetch_with_limit(sem, u) for u in urls]
)
print(f"Total {len(results)} completed")
asyncio.run(main())
# Runs 3 at a time, 10 total completed
Async Iterators and Generators
Use async for and async def ... yield to process asynchronous streams.
import asyncio
async def fetch_pages(total: int):
"""Async generator that fetches pages one by one"""
for page in range(1, total + 1):
await asyncio.sleep(0.3) # Simulates page loading
yield {"page": page, "items": [f"item_{i}" for i in range(5)]}
async def main():
async for page_data in fetch_pages(3):
print(f"Page {page_data['page']}: "
f"{len(page_data['items'])} items")
# Page 1: 5 items
# Page 2: 5 items
# Page 3: 5 items
asyncio.run(main())
Summary
asyncio is Python’s core asynchronous framework that dramatically improves performance for I/O-bound tasks.
- Basic syntax: Define coroutines with
async def, wait for execution withawait - Concurrent execution: Run multiple coroutines simultaneously with
asyncio.gather() - Structured concurrency: Safe task management with
TaskGroup(3.11+) - Timeouts: Prevent indefinite waiting with
asyncio.timeout()orwait_for() - Concurrency limits: Restrict concurrent execution count with
Semaphore - Async iteration: Process streaming data with
async for - Caveat: For CPU-bound tasks, use
multiprocessinginstead ofasyncio