Building Web APIs
Web APIs are perfect for async programming because they spend most of their time waiting - for database queries, external API calls, and file operations. Let’s build efficient APIs step by step.
Why Async APIs Matter
Consider a traditional synchronous web server:
- Request 1: Get user data (100ms database query)
- Request 2: Must wait for Request 1 to complete
- Request 3: Must wait for Requests 1 and 2
With async APIs, all three requests can run simultaneously, sharing the same server resources efficiently.
Basic FastAPI Setup
FastAPI makes async web development straightforward:
from fastapi import FastAPI, HTTPException
import asyncio
app = FastAPI(title="My Async API")
@app.get("/")
async def root():
return {"message": "Hello Async World!"}
The basic setup is simple - just add async
to your route handlers to enable async processing.
Add endpoints that demonstrate async benefits:
@app.get("/slow-endpoint")
async def slow_endpoint():
await asyncio.sleep(2) # Simulate slow database query
return {"message": "This took 2 seconds, but didn't block other requests!"}
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# Simulate async database call
await asyncio.sleep(0.1)
if user_id == 404:
raise HTTPException(status_code=404, detail="User not found")
return {
"id": user_id,
"name": f"User {user_id}",
"email": f"user{user_id}@example.com"
}
# Run with: uvicorn main:app --reload
Database Integration
Integrate with async database drivers:
import asyncpg
from fastapi import Depends
# Database connection pool
db_pool = None
@app.on_event("startup")
async def startup():
global db_pool
db_pool = await asyncpg.create_pool(
"postgresql://user:password@localhost/mydb",
min_size=5,
max_size=20
)
@app.on_event("shutdown")
async def shutdown():
if db_pool:
await db_pool.close()
async def get_db():
"""Dependency to get database connection"""
async with db_pool.acquire() as connection:
yield connection
@app.get("/users")
async def list_users(limit: int = 10, db=Depends(get_db)):
"""List users with pagination"""
query = "SELECT id, name, email FROM users LIMIT $1"
rows = await db.fetch(query, limit)
return [
{"id": row["id"], "name": row["name"], "email": row["email"]}
for row in rows
]
@app.post("/users")
async def create_user(user_data: dict, db=Depends(get_db)):
"""Create a new user"""
query = """
INSERT INTO users (name, email)
VALUES ($1, $2)
RETURNING id, name, email
"""
row = await db.fetchrow(query, user_data["name"], user_data["email"])
return {"id": row["id"], "name": row["name"], "email": row["email"]}
External API Integration
Make concurrent calls to external services:
import aiohttp
async def fetch_weather(city: str) -> dict:
"""Fetch weather data from external API"""
async with aiohttp.ClientSession() as session:
url = f"https://api.weather.com/v1/current?city={city}"
try:
async with session.get(url, timeout=5) as response:
if response.status == 200:
return await response.json()
else:
return {"error": "Weather service unavailable"}
except asyncio.TimeoutError:
return {"error": "Weather service timeout"}
async def fetch_news(category: str) -> dict:
"""Fetch news from external API"""
async with aiohttp.ClientSession() as session:
url = f"https://api.news.com/v1/headlines?category={category}"
try:
async with session.get(url, timeout=5) as response:
if response.status == 200:
return await response.json()
else:
return {"articles": []} # Graceful degradation
except:
return {"articles": []} # Graceful degradation
@app.get("/dashboard/{city}")
async def get_dashboard(city: str):
"""Get dashboard data from multiple sources concurrently"""
# Fetch data from multiple sources simultaneously
weather, news = await asyncio.gather(
fetch_weather(city),
fetch_news("technology"),
return_exceptions=True
)
# Handle partial failures gracefully
dashboard = {"city": city}
if isinstance(weather, dict):
dashboard["weather"] = weather
else:
dashboard["weather"] = {"error": "Weather unavailable"}
if isinstance(news, dict):
dashboard["news"] = news
else:
dashboard["news"] = {"articles": []}
return dashboard
Background Tasks
Handle long-running operations with background tasks:
from fastapi import BackgroundTasks
import logging
logger = logging.getLogger(__name__)
async def send_email(email: str, subject: str):
"""Simulate sending email"""
logger.info(f"Sending email to {email}: {subject}")
await asyncio.sleep(2) # Simulate email sending
logger.info(f"Email sent to {email}")
async def process_image(image_path: str, user_id: int):
"""Simulate image processing"""
logger.info(f"Processing image {image_path} for user {user_id}")
await asyncio.sleep(5) # Simulate processing
logger.info(f"Image processing complete for user {user_id}")
@app.post("/users/{user_id}/upload")
async def upload_image(
user_id: int,
image_data: dict,
background_tasks: BackgroundTasks
):
"""Upload and process image"""
# Save image immediately
image_path = f"/uploads/{user_id}/{image_data['filename']}"
# Process image in background
background_tasks.add_task(process_image, image_path, user_id)
# Send confirmation email in background
background_tasks.add_task(
send_email,
"[email protected]",
"Image Upload Confirmation"
)
return {
"message": "Image uploaded successfully",
"image_path": image_path,
"status": "processing"
}
Rate Limiting
Implement basic rate limiting:
import time
from collections import defaultdict
from fastapi import Request, HTTPException
class RateLimiter:
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
async def is_allowed(self, client_id: str) -> bool:
"""Check if request is allowed"""
now = time.time()
# Clean old requests
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if now - req_time < self.window_seconds
]
# Check if under limit
if len(self.requests[client_id]) < self.max_requests:
self.requests[client_id].append(now)
return True
return False
# Global rate limiter: 100 requests per minute
rate_limiter = RateLimiter(max_requests=100, window_seconds=60)
async def rate_limit_middleware(request: Request, call_next):
"""Rate limiting middleware"""
client_ip = request.client.host
if not await rate_limiter.is_allowed(client_ip):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded. Try again later."
)
response = await call_next(request)
return response
app.middleware("http")(rate_limit_middleware)
Summary
Building production-ready async APIs requires:
Key Components
- FastAPI Framework: Modern, fast async web framework
- Database Integration: Async database drivers and connection pooling
- External APIs: Concurrent HTTP requests with proper error handling
- Background Tasks: Long-running operations without blocking requests
- Rate Limiting: Protection against abuse
Best Practices
- Use connection pooling for databases and HTTP clients
- Implement proper error handling and logging
- Add rate limiting to protect against abuse
- Handle partial failures gracefully
- Use background tasks for long-running operations
Performance Tips
- Pool database connections
- Reuse HTTP sessions
- Implement timeouts for external calls
- Use concurrent requests where possible
- Cache expensive operations
In Part 10, we’ll explore data processing pipelines that can handle large-scale data efficiently.