Building Web APIs

Web APIs are perfect for async programming because they spend most of their time waiting - for database queries, external API calls, and file operations. Let’s build efficient APIs step by step.

Why Async APIs Matter

Consider a traditional synchronous web server:

  • Request 1: Get user data (100ms database query)
  • Request 2: Must wait for Request 1 to complete
  • Request 3: Must wait for Requests 1 and 2

With async APIs, all three requests can run simultaneously, sharing the same server resources efficiently.

Basic FastAPI Setup

FastAPI makes async web development straightforward:

from fastapi import FastAPI, HTTPException
import asyncio

app = FastAPI(title="My Async API")

@app.get("/")
async def root():
    return {"message": "Hello Async World!"}

The basic setup is simple - just add async to your route handlers to enable async processing.

Add endpoints that demonstrate async benefits:

@app.get("/slow-endpoint")
async def slow_endpoint():
    await asyncio.sleep(2)  # Simulate slow database query
    return {"message": "This took 2 seconds, but didn't block other requests!"}

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    # Simulate async database call
    await asyncio.sleep(0.1)
    
    if user_id == 404:
        raise HTTPException(status_code=404, detail="User not found")
    
    return {
        "id": user_id,
        "name": f"User {user_id}",
        "email": f"user{user_id}@example.com"
    }

# Run with: uvicorn main:app --reload

Database Integration

Integrate with async database drivers:

import asyncpg
from fastapi import Depends

# Database connection pool
db_pool = None

@app.on_event("startup")
async def startup():
    global db_pool
    db_pool = await asyncpg.create_pool(
        "postgresql://user:password@localhost/mydb",
        min_size=5,
        max_size=20
    )

@app.on_event("shutdown")
async def shutdown():
    if db_pool:
        await db_pool.close()

async def get_db():
    """Dependency to get database connection"""
    async with db_pool.acquire() as connection:
        yield connection

@app.get("/users")
async def list_users(limit: int = 10, db=Depends(get_db)):
    """List users with pagination"""
    query = "SELECT id, name, email FROM users LIMIT $1"
    rows = await db.fetch(query, limit)
    
    return [
        {"id": row["id"], "name": row["name"], "email": row["email"]}
        for row in rows
    ]

@app.post("/users")
async def create_user(user_data: dict, db=Depends(get_db)):
    """Create a new user"""
    query = """
        INSERT INTO users (name, email) 
        VALUES ($1, $2) 
        RETURNING id, name, email
    """
    
    row = await db.fetchrow(query, user_data["name"], user_data["email"])
    
    return {"id": row["id"], "name": row["name"], "email": row["email"]}

External API Integration

Make concurrent calls to external services:

import aiohttp

async def fetch_weather(city: str) -> dict:
    """Fetch weather data from external API"""
    async with aiohttp.ClientSession() as session:
        url = f"https://api.weather.com/v1/current?city={city}"
        
        try:
            async with session.get(url, timeout=5) as response:
                if response.status == 200:
                    return await response.json()
                else:
                    return {"error": "Weather service unavailable"}
        except asyncio.TimeoutError:
            return {"error": "Weather service timeout"}

async def fetch_news(category: str) -> dict:
    """Fetch news from external API"""
    async with aiohttp.ClientSession() as session:
        url = f"https://api.news.com/v1/headlines?category={category}"
        
        try:
            async with session.get(url, timeout=5) as response:
                if response.status == 200:
                    return await response.json()
                else:
                    return {"articles": []}  # Graceful degradation
        except:
            return {"articles": []}  # Graceful degradation

@app.get("/dashboard/{city}")
async def get_dashboard(city: str):
    """Get dashboard data from multiple sources concurrently"""
    
    # Fetch data from multiple sources simultaneously
    weather, news = await asyncio.gather(
        fetch_weather(city),
        fetch_news("technology"),
        return_exceptions=True
    )
    
    # Handle partial failures gracefully
    dashboard = {"city": city}
    
    if isinstance(weather, dict):
        dashboard["weather"] = weather
    else:
        dashboard["weather"] = {"error": "Weather unavailable"}
    
    if isinstance(news, dict):
        dashboard["news"] = news
    else:
        dashboard["news"] = {"articles": []}
    
    return dashboard

Background Tasks

Handle long-running operations with background tasks:

from fastapi import BackgroundTasks
import logging

logger = logging.getLogger(__name__)

async def send_email(email: str, subject: str):
    """Simulate sending email"""
    logger.info(f"Sending email to {email}: {subject}")
    await asyncio.sleep(2)  # Simulate email sending
    logger.info(f"Email sent to {email}")

async def process_image(image_path: str, user_id: int):
    """Simulate image processing"""
    logger.info(f"Processing image {image_path} for user {user_id}")
    await asyncio.sleep(5)  # Simulate processing
    logger.info(f"Image processing complete for user {user_id}")

@app.post("/users/{user_id}/upload")
async def upload_image(
    user_id: int,
    image_data: dict,
    background_tasks: BackgroundTasks
):
    """Upload and process image"""
    
    # Save image immediately
    image_path = f"/uploads/{user_id}/{image_data['filename']}"
    
    # Process image in background
    background_tasks.add_task(process_image, image_path, user_id)
    
    # Send confirmation email in background
    background_tasks.add_task(
        send_email,
        "[email protected]",
        "Image Upload Confirmation"
    )
    
    return {
        "message": "Image uploaded successfully",
        "image_path": image_path,
        "status": "processing"
    }

Rate Limiting

Implement basic rate limiting:

import time
from collections import defaultdict
from fastapi import Request, HTTPException

class RateLimiter:
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = defaultdict(list)
    
    async def is_allowed(self, client_id: str) -> bool:
        """Check if request is allowed"""
        now = time.time()
        
        # Clean old requests
        self.requests[client_id] = [
            req_time for req_time in self.requests[client_id]
            if now - req_time < self.window_seconds
        ]
        
        # Check if under limit
        if len(self.requests[client_id]) < self.max_requests:
            self.requests[client_id].append(now)
            return True
        
        return False

# Global rate limiter: 100 requests per minute
rate_limiter = RateLimiter(max_requests=100, window_seconds=60)

async def rate_limit_middleware(request: Request, call_next):
    """Rate limiting middleware"""
    client_ip = request.client.host
    
    if not await rate_limiter.is_allowed(client_ip):
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded. Try again later."
        )
    
    response = await call_next(request)
    return response

app.middleware("http")(rate_limit_middleware)

Summary

Building production-ready async APIs requires:

Key Components

  • FastAPI Framework: Modern, fast async web framework
  • Database Integration: Async database drivers and connection pooling
  • External APIs: Concurrent HTTP requests with proper error handling
  • Background Tasks: Long-running operations without blocking requests
  • Rate Limiting: Protection against abuse

Best Practices

  • Use connection pooling for databases and HTTP clients
  • Implement proper error handling and logging
  • Add rate limiting to protect against abuse
  • Handle partial failures gracefully
  • Use background tasks for long-running operations

Performance Tips

  • Pool database connections
  • Reuse HTTP sessions
  • Implement timeouts for external calls
  • Use concurrent requests where possible
  • Cache expensive operations

In Part 10, we’ll explore data processing pipelines that can handle large-scale data efficiently.