I avoided asyncio for years. Callbacks, event loops, futures — it all felt like unnecessary complexity when threads worked fine. Then we had an API endpoint making 200 sequential HTTP calls to an upstream service. 45 seconds per request. We threw asyncio.gather at it and the whole thing dropped to 3 seconds. That was the moment it clicked.

Python’s async story has matured enormously. What used to be a mess of yield from and manual loop management is now clean, readable, and genuinely powerful. If you’ve been putting off learning asyncio properly, this is the guide I wish I’d had.


The Event Loop: What’s Actually Happening

The event loop is the engine. It’s a single-threaded loop that manages and distributes the execution of coroutines. When a coroutine hits an await, it yields control back to the loop, which picks up another coroutine that’s ready to run.

This is fundamentally different from threading. There’s no preemptive switching, no race conditions from shared state, no GIL contention. You decide when to yield. That’s the deal.

import asyncio

async def fetch_data(name: str, delay: float) -> str:
    print(f"{name}: starting")
    await asyncio.sleep(delay)  # yields control here
    print(f"{name}: done")
    return f"{name} result"

async def main():
    results = await asyncio.gather(
        fetch_data("A", 2),
        fetch_data("B", 1),
        fetch_data("C", 3),
    )
    print(results)

asyncio.run(main())

All three coroutines run concurrently on a single thread. Total time: ~3 seconds, not 6. The asyncio.run() call creates the event loop, runs main(), and tears everything down cleanly.

One thing that tripped me up early: asyncio.run() creates a new event loop each time. You can’t nest it. If you’re already inside an async context and try calling asyncio.run(), you’ll get a RuntimeError. Use await directly instead.

If you’ve worked with Go’s concurrency model, the mental model is similar — goroutines yield at I/O points, and the scheduler handles the rest. Python’s asyncio is more explicit about it, which I actually prefer. You can see exactly where the context switches happen.


async/await: The Basics Done Right

An async def function returns a coroutine object. It doesn’t execute until you await it or schedule it as a task. This catches people out constantly.

async def compute():
    return 42

# Wrong - this just creates a coroutine object, doesn't run it
coro = compute()  # RuntimeWarning: coroutine was never awaited

# Right
result = await compute()

You can await three things: coroutines, tasks, and futures. In practice, you’ll mostly await coroutines and tasks.

The await keyword is your yield point. Between two await expressions, your code runs uninterrupted. This is why asyncio doesn’t have the same data race issues as threading — you know exactly when another coroutine might run.

async def transfer(from_account, to_account, amount):
    balance = from_account.balance  # no await, so no switch
    from_account.balance -= amount   # still atomic
    to_account.balance += amount     # still atomic
    await save_to_db(from_account)   # HERE another coroutine could run
    await save_to_db(to_account)     # and HERE

That said, the gap between those two save_to_db calls is a real concern. If the first succeeds and the second fails, you’ve got inconsistent state. Async doesn’t magically solve transactional problems. You still need proper error handling, which I’ll get to.


Tasks: Fire and (Don’t) Forget

A Task wraps a coroutine and schedules it to run on the event loop. This is how you get actual concurrency — without creating a task, coroutines run sequentially.

async def background_job(name: str):
    await asyncio.sleep(1)
    print(f"{name} completed")

async def main():
    # These run concurrently
    task1 = asyncio.create_task(background_job("job-1"))
    task2 = asyncio.create_task(background_job("job-2"))

    # Do other work while tasks run...
    print("Tasks are running in the background")

    # Wait for them when you need the results
    await task1
    await task2

I’ve seen a lot of code that creates tasks and never awaits them. Don’t do this. If the event loop shuts down before a task completes, it gets cancelled silently. Worse, if the task raises an exception and nobody’s listening, you get a scary “Task exception was never retrieved” warning.

# Bad - fire and forget
async def main():
    asyncio.create_task(some_job())  # might never complete
    # main exits, loop shuts down, task dies

# Better - track your tasks
async def main():
    task = asyncio.create_task(some_job())
    try:
        await task
    except Exception as e:
        logging.error(f"Task failed: {e}")

Tasks also support cancellation, which is genuinely useful for timeouts:

async def main():
    task = asyncio.create_task(slow_operation())
    try:
        result = await asyncio.wait_for(task, timeout=5.0)
    except asyncio.TimeoutError:
        print("Operation timed out, task was cancelled")

This pattern comes up all the time in production. If you’ve read my piece on Python context managers, you’ll appreciate how cleanly this composes with resource cleanup.


gather vs TaskGroup: Pick Your Fighter

asyncio.gather() has been the workhorse for running multiple coroutines concurrently since the beginning. It works. But it has a nasty edge case with error handling that’s bitten me more than once.

async def main():
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
        return_exceptions=True,  # don't let one failure kill everything
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"One failed: {r}")
        else:
            print(f"Got: {r}")

Without return_exceptions=True, the first exception cancels everything and propagates up. With it, exceptions get mixed into the results list alongside successful values. You end up writing isinstance checks everywhere. It’s ugly.

Python 3.11 introduced TaskGroup, and I prefer it these days. It’s a structured concurrency primitive — all tasks in the group are guaranteed to complete (or be cancelled) before the group exits. No orphaned tasks.

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_user(1))
        task2 = tg.create_task(fetch_user(2))
        task3 = tg.create_task(fetch_user(3))

    # All tasks are done here
    print(task1.result(), task2.result(), task3.result())

If any task raises an exception, the TaskGroup cancels all remaining tasks and raises an ExceptionGroup. This is cleaner because you’re forced to handle errors explicitly rather than silently swallowing them in a results list.

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(might_fail())
            tg.create_task(might_also_fail())
    except* ValueError as eg:
        for exc in eg.exceptions:
            print(f"ValueError: {exc}")
    except* ConnectionError as eg:
        for exc in eg.exceptions:
            print(f"Connection issue: {exc}")

The except* syntax (also from 3.11) handles ExceptionGroup — you can catch different exception types from the group separately. It took me a while to warm up to this syntax, but it’s genuinely better than the gather approach for anything non-trivial.

My rule of thumb: use gather for quick scripts and simple fan-out. Use TaskGroup for production code where error handling matters. Which is… most code.


Semaphores: Don’t Melt Your Dependencies

Here’s a mistake I see constantly. Someone discovers asyncio, gets excited about concurrency, and fires off 10,000 HTTP requests simultaneously. The upstream API rate-limits them, the database connection pool exhausts, or they just OOM their own process.

Semaphores are the fix. They limit how many coroutines can run a particular section of code concurrently.

sem = asyncio.Semaphore(20)  # max 20 concurrent

async def fetch_with_limit(url: str) -> str:
    async with sem:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                return await resp.text()

async def main():
    urls = [f"https://api.example.com/item/{i}" for i in range(500)]
    tasks = [fetch_with_limit(url) for url in urls]
    results = await asyncio.gather(*tasks)

All 500 tasks are created immediately, but only 20 run at any given time. The rest wait at the semaphore. This is the pattern I use for basically every bulk I/O operation.

A subtlety: I’m creating a new ClientSession per request in that example for clarity, but in production you’d want to share a single session. Creating sessions is expensive.

async def main():
    sem = asyncio.Semaphore(20)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, sem, url) for url in urls]
        results = await asyncio.gather(*tasks)

async def fetch(session, sem, url):
    async with sem:
        async with session.get(url) as resp:
            return await resp.text()

If you’ve used Python decorators, you might be tempted to write a semaphore decorator. I’ve done it. It works nicely:

def with_semaphore(sem):
    def decorator(func):
        async def wrapper(*args, **kwargs):
            async with sem:
                return await func(*args, **kwargs)
        return wrapper
    return decorator

rate_limit = asyncio.Semaphore(10)

@with_semaphore(rate_limit)
async def call_api(endpoint: str):
    ...

aiohttp in Practice

aiohttp is the async HTTP library. If you’re doing any kind of API integration, web scraping, or microservice communication with asyncio, you’re using aiohttp (or httpx, which also has async support — both are solid).

Here’s the pattern I reach for most often — a reusable async client with retries, timeouts, and connection pooling:

import aiohttp
import asyncio

async def fetch_all(urls: list[str]) -> list[dict]:
    timeout = aiohttp.ClientTimeout(total=30, connect=5)
    connector = aiohttp.TCPConnector(limit=50)

    async with aiohttp.ClientSession(
        timeout=timeout, connector=connector
    ) as session:
        sem = asyncio.Semaphore(20)
        tasks = [_fetch_one(session, sem, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

async def _fetch_one(session, sem, url, retries=3):
    async with sem:
        for attempt in range(retries):
            try:
                async with session.get(url) as resp:
                    resp.raise_for_status()
                    return await resp.json()
            except (aiohttp.ClientError, asyncio.TimeoutError):
                if attempt == retries - 1:
                    raise
                await asyncio.sleep(2 ** attempt)  # exponential backoff

Key things happening here:

  • TCPConnector(limit=50) caps the total number of open connections. This prevents socket exhaustion.
  • ClientTimeout sets both overall and connection-specific timeouts. Always set these. I’ve seen production services hang indefinitely because someone forgot a timeout.
  • The semaphore limits concurrency independently of the connection pool. You might want 50 connections but only 20 active requests.
  • Exponential backoff on retries. Don’t hammer a failing service.

The ClientSession should be created once and reused. Creating a session per request is a common mistake — each session creates its own connection pool, which defeats the purpose entirely.

For more complex scenarios, I’ll wrap this in a class and use generators to stream results as they complete rather than waiting for everything:

async def fetch_as_completed(urls: list[str]):
    async with aiohttp.ClientSession() as session:
        sem = asyncio.Semaphore(20)
        tasks = {
            asyncio.create_task(_fetch_one(session, sem, url)): url
            for url in urls
        }
        for coro in asyncio.as_completed(tasks):
            try:
                result = await coro
                yield result
            except Exception as e:
                yield {"error": str(e)}

asyncio.as_completed() is underrated. It yields futures in the order they complete, not the order they were submitted. Perfect for progress reporting or early termination.


Error Handling: The Hard Part

Error handling in async code is where things get genuinely tricky. A single unhandled exception in a task can bring down your entire application, or worse, silently disappear.

Rule one: always handle exceptions in tasks.

async def resilient_worker(queue: asyncio.Queue):
    while True:
        item = await queue.get()
        try:
            await process(item)
        except Exception:
            logging.exception(f"Failed to process {item}")
        finally:
            queue.task_done()

Rule two: use asyncio.shield() when you have critical operations that shouldn’t be cancelled.

async def handle_request(data):
    result = await compute(data)
    # Don't cancel the database write even if the request is cancelled
    await asyncio.shield(save_to_db(result))

shield() is a bit of a footgun though. The shielded coroutine keeps running even if the outer task is cancelled, but if you don’t hold a reference to it, the result gets lost. I use it sparingly.

Rule three: set a global exception handler for anything that slips through.

def handle_exception(loop, context):
    exc = context.get("exception", context["message"])
    logging.error(f"Unhandled async exception: {exc}")

loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)

In production, I combine all of these. The worker-queue pattern with proper exception handling is my go-to for any async processing pipeline:

async def run_pipeline(items: list, concurrency: int = 10):
    queue = asyncio.Queue()
    for item in items:
        await queue.put(item)

    async def worker():
        while not queue.empty():
            item = await queue.get()
            try:
                await process(item)
            except Exception:
                logging.exception(f"Failed: {item}")
            finally:
                queue.task_done()

    workers = [asyncio.create_task(worker()) for _ in range(concurrency)]
    await queue.join()
    for w in workers:
        w.cancel()

This gives you bounded concurrency, proper error isolation (one failure doesn’t kill the pipeline), and clean shutdown. It’s the pattern behind most of the async services I’ve built.


Debugging Async Code

Debugging asyncio is painful. I won’t sugarcoat it. Stack traces are fragmented across coroutine boundaries, and print debugging becomes your best friend more often than you’d like.

Enable debug mode. Always, during development.

asyncio.run(main(), debug=True)

This catches common mistakes: coroutines that were never awaited, tasks that took too long without yielding, and slow callbacks blocking the event loop. The warnings are genuinely helpful.

For finding where your code is spending time, asyncio.get_event_loop().slow_callback_duration lets you set a threshold (default is 100ms). Anything that blocks the loop longer than that gets logged.

The other tool I lean on is asyncio.all_tasks():

async def debug_tasks():
    for task in asyncio.all_tasks():
        print(f"{task.get_name()}: {task.get_coro()}")

Name your tasks. Seriously. When you’ve got 200 tasks running and something’s stuck, Task-147 tells you nothing. fetch-user-42 tells you everything.

asyncio.create_task(fetch_user(42), name="fetch-user-42")

When NOT to Use asyncio

Async isn’t a silver bullet. I’ve seen teams rewrite perfectly fine synchronous code in asyncio and end up with something slower and harder to maintain.

Don’t use asyncio for CPU-bound work. It won’t help. The event loop runs on a single thread — if you’re crunching numbers, you’re blocking everything else. Use multiprocessing or, if you’re feeling adventurous, check out Python 3.13’s no-GIL threading.

Don’t use asyncio if your I/O is already fast enough. If you’re making 5 API calls and they take 200ms total, the complexity of async isn’t worth it. Just use requests.

Don’t use asyncio if your team doesn’t understand it. Async code that’s written wrong is worse than synchronous code. Forgotten awaits, unhandled task exceptions, blocking calls in async functions — these bugs are subtle and hard to find.

Do use asyncio when:

  • You’re making many concurrent I/O calls (HTTP, database, file system)
  • You’re building a server that handles many simultaneous connections
  • You need to manage long-lived connections (WebSockets, streaming)
  • Your bottleneck is I/O wait time, not CPU

Mixing Sync and Async

Real codebases aren’t purely async. You’ll have synchronous libraries, blocking I/O calls, and legacy code that can’t be rewritten. asyncio handles this with run_in_executor:

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def main():
    loop = asyncio.get_running_loop()
    # Run blocking code in a thread pool
    result = await loop.run_in_executor(
        None,  # default executor
        blocking_io_function,
        arg1, arg2,
    )

The default executor is a ThreadPoolExecutor. For CPU-bound work, pass a ProcessPoolExecutor instead. This is how you integrate synchronous database drivers, file I/O libraries, or any blocking code into an async application without freezing the event loop.

Going the other direction — calling async code from sync code — is messier. asyncio.run() works if there’s no running loop. If there is (like inside a Jupyter notebook or a framework that manages its own loop), you need nest_asyncio or a background thread running the loop. It’s not pretty, but it works.


Putting It Together

Here’s a real-world-ish example that combines everything — a service that fetches data from multiple APIs, processes it, and stores results. This is essentially the pattern behind that 45-second-to-3-second optimization I mentioned at the start.

import asyncio
import aiohttp
import logging

logging.basicConfig(level=logging.INFO)

async def fetch_and_process(
    urls: list[str],
    concurrency: int = 20,
) -> list[dict]:
    sem = asyncio.Semaphore(concurrency)
    timeout = aiohttp.ClientTimeout(total=30)

    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with asyncio.TaskGroup() as tg:
            tasks = [
                tg.create_task(
                    _process_url(session, sem, url),
                    name=f"fetch-{i}",
                )
                for i, url in enumerate(urls)
            ]

    return [t.result() for t in tasks if not t.cancelled()]

async def _process_url(session, sem, url):
    async with sem:
        for attempt in range(3):
            try:
                async with session.get(url) as resp:
                    resp.raise_for_status()
                    data = await resp.json()
                    return transform(data)
            except (aiohttp.ClientError, asyncio.TimeoutError):
                if attempt == 2:
                    logging.error(f"Failed after 3 attempts: {url}")
                    raise
                await asyncio.sleep(2 ** attempt)

def transform(data: dict) -> dict:
    # your business logic here
    return {"processed": True, **data}

if __name__ == "__main__":
    urls = [f"https://api.example.com/v1/items/{i}" for i in range(200)]
    results = asyncio.run(fetch_and_process(urls), debug=True)
    logging.info(f"Processed {len(results)} items")

Semaphore for rate limiting. TaskGroup for structured concurrency. Named tasks for debugging. Retries with backoff. Shared session with timeouts. This is the template I start from for any async I/O workload.

asyncio isn’t going anywhere. With structured concurrency in 3.11, except* for exception groups, and the ecosystem maturing around aiohttp, httpx, and asyncpg, it’s become a core skill for Python developers. The learning curve is real, but once the mental model clicks — and it will — you won’t want to go back to writing sequential I/O code.