WebSockets and Real-time Features
WebSockets enable real-time, bidirectional communication between clients and servers. Let’s explore implementing WebSocket servers and clients with async Python.
Basic WebSocket Server
Create a WebSocket server with FastAPI:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import asyncio
import json
app = FastAPI()
First, we need a connection manager to handle multiple WebSocket connections:
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
The connection manager maintains a list of active WebSocket connections. When clients connect, we accept the connection and add it to our list.
For messaging, we need both personal and broadcast capabilities:
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except:
# Remove dead connections
self.active_connections.remove(connection)
manager = ConnectionManager()
Broadcasting handles connection failures gracefully by removing dead connections. This prevents the server from trying to send to closed connections.
Now we can create the WebSocket endpoint:
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
# Receive message from client
data = await websocket.receive_text()
message = json.loads(data)
# Process message based on type
if message["type"] == "broadcast":
await manager.broadcast(f"Client {client_id}: {message['content']}")
elif message["type"] == "echo":
await manager.send_personal_message(
f"Echo: {message['content']}", websocket
)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client {client_id} disconnected")
WebSocket Client
Create a WebSocket client:
import asyncio
import websockets
import json
class WebSocketClient:
def __init__(self, uri: str):
self.uri = uri
self.websocket = None
async def connect(self):
self.websocket = await websockets.connect(self.uri)
async def send_message(self, message_type: str, content: str):
if self.websocket:
message = json.dumps({"type": message_type, "content": content})
await self.websocket.send(message)
async def listen_for_messages(self):
if not self.websocket:
return
try:
async for message in self.websocket:
print(f"Received: {message}")
except websockets.exceptions.ConnectionClosed:
print("Connection closed")
async def close(self):
if self.websocket:
await self.websocket.close()
Real-time Chat Application
Build a simple chat system:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json
from datetime import datetime
class ChatRoom:
def __init__(self):
self.connections = {}
self.message_history = []
async def add_user(self, websocket: WebSocket, username: str):
await websocket.accept()
self.connections[username] = websocket
# Send recent messages to new user
for message in self.message_history[-10:]:
await websocket.send_text(json.dumps(message))
def remove_user(self, username: str):
if username in self.connections:
del self.connections[username]
async def broadcast_message(self, message: dict):
"""Broadcast message to all connected users"""
self.message_history.append(message)
message_json = json.dumps(message)
dead_connections = []
for username, websocket in self.connections.items():
try:
await websocket.send_text(message_json)
except:
dead_connections.append(username)
# Clean up dead connections
for username in dead_connections:
self.remove_user(username)
chat_room = ChatRoom()
@app.websocket("/chat/{username}")
async def chat_endpoint(websocket: WebSocket, username: str):
await chat_room.add_user(websocket, username)
try:
while True:
data = await websocket.receive_text()
message_data = json.loads(data)
message = {
"username": username,
"content": message_data["content"],
"timestamp": datetime.now().isoformat()
}
await chat_room.broadcast_message(message)
except WebSocketDisconnect:
chat_room.remove_user(username)
Real-time Data Streaming
Stream live data to clients:
import asyncio
import json
import random
from datetime import datetime
class DataStreamer:
def __init__(self):
self.subscribers = set()
def subscribe(self, websocket):
self.subscribers.add(websocket)
def unsubscribe(self, websocket):
self.subscribers.discard(websocket)
async def start_streaming(self):
"""Start streaming data to all subscribers"""
while True:
# Generate sample data
data = {
"timestamp": datetime.now().isoformat(),
"temperature": round(random.uniform(20, 30), 2),
"humidity": round(random.uniform(40, 80), 2)
}
# Send to all subscribers
dead_connections = set()
for websocket in self.subscribers:
try:
await websocket.send_text(json.dumps(data))
except:
dead_connections.add(websocket)
# Remove dead connections
self.subscribers -= dead_connections
await asyncio.sleep(1)
streamer = DataStreamer()
@app.websocket("/stream")
async def stream_endpoint(websocket: WebSocket):
await websocket.accept()
streamer.subscribe(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
streamer.unsubscribe(websocket)
Best Practices
Key WebSocket implementation principles:
Connection Management:
- Track active connections properly
- Clean up dead connections automatically
- Handle disconnections gracefully
Message Handling:
- Use JSON for structured messages
- Implement message types for different actions
- Validate incoming messages
Error Handling:
- Implement reconnection logic on client side
- Use try-catch blocks around WebSocket operations
- Log connection events for debugging
Performance:
- Limit message history size
- Remove inactive connections promptly
- Use connection pooling for multiple clients
Summary
WebSocket implementation essentials:
- Use FastAPI or similar frameworks for WebSocket servers
- Implement proper connection management
- Handle disconnections and errors gracefully
- Use structured messages with JSON
- Implement reconnection logic for robust clients
- Clean up resources properly
WebSockets enable powerful real-time features in async applications, from chat systems to live data streaming.
In Part 19, we’ll explore health checks and monitoring.