WebSockets and Real-time Features

WebSockets enable real-time, bidirectional communication between clients and servers. Let’s explore implementing WebSocket servers and clients with async Python.

Basic WebSocket Server

Create a WebSocket server with FastAPI:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import asyncio
import json

app = FastAPI()

First, we need a connection manager to handle multiple WebSocket connections:

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []
    
    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)
    
    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

The connection manager maintains a list of active WebSocket connections. When clients connect, we accept the connection and add it to our list.

For messaging, we need both personal and broadcast capabilities:

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)
    
    async def broadcast(self, message: str):
        for connection in self.active_connections:
            try:
                await connection.send_text(message)
            except:
                # Remove dead connections
                self.active_connections.remove(connection)

manager = ConnectionManager()

Broadcasting handles connection failures gracefully by removing dead connections. This prevents the server from trying to send to closed connections.

Now we can create the WebSocket endpoint:

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    
    try:
        while True:
            # Receive message from client
            data = await websocket.receive_text()
            message = json.loads(data)
            
            # Process message based on type
            if message["type"] == "broadcast":
                await manager.broadcast(f"Client {client_id}: {message['content']}")
            elif message["type"] == "echo":
                await manager.send_personal_message(
                    f"Echo: {message['content']}", websocket
                )
                
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"Client {client_id} disconnected")

WebSocket Client

Create a WebSocket client:

import asyncio
import websockets
import json

class WebSocketClient:
    def __init__(self, uri: str):
        self.uri = uri
        self.websocket = None
    
    async def connect(self):
        self.websocket = await websockets.connect(self.uri)
    
    async def send_message(self, message_type: str, content: str):
        if self.websocket:
            message = json.dumps({"type": message_type, "content": content})
            await self.websocket.send(message)
    
    async def listen_for_messages(self):
        if not self.websocket:
            return
        
        try:
            async for message in self.websocket:
                print(f"Received: {message}")
        except websockets.exceptions.ConnectionClosed:
            print("Connection closed")
    
    async def close(self):
        if self.websocket:
            await self.websocket.close()

Real-time Chat Application

Build a simple chat system:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json
from datetime import datetime

class ChatRoom:
    def __init__(self):
        self.connections = {}
        self.message_history = []
    
    async def add_user(self, websocket: WebSocket, username: str):
        await websocket.accept()
        self.connections[username] = websocket
        
        # Send recent messages to new user
        for message in self.message_history[-10:]:
            await websocket.send_text(json.dumps(message))
    
    def remove_user(self, username: str):
        if username in self.connections:
            del self.connections[username]
    
    async def broadcast_message(self, message: dict):
        """Broadcast message to all connected users"""
        self.message_history.append(message)
        message_json = json.dumps(message)
        
        dead_connections = []
        for username, websocket in self.connections.items():
            try:
                await websocket.send_text(message_json)
            except:
                dead_connections.append(username)
        
        # Clean up dead connections
        for username in dead_connections:
            self.remove_user(username)

chat_room = ChatRoom()

@app.websocket("/chat/{username}")
async def chat_endpoint(websocket: WebSocket, username: str):
    await chat_room.add_user(websocket, username)
    
    try:
        while True:
            data = await websocket.receive_text()
            message_data = json.loads(data)
            
            message = {
                "username": username,
                "content": message_data["content"],
                "timestamp": datetime.now().isoformat()
            }
            
            await chat_room.broadcast_message(message)
            
    except WebSocketDisconnect:
        chat_room.remove_user(username)

Real-time Data Streaming

Stream live data to clients:

import asyncio
import json
import random
from datetime import datetime

class DataStreamer:
    def __init__(self):
        self.subscribers = set()
    
    def subscribe(self, websocket):
        self.subscribers.add(websocket)
    
    def unsubscribe(self, websocket):
        self.subscribers.discard(websocket)
    
    async def start_streaming(self):
        """Start streaming data to all subscribers"""
        while True:
            # Generate sample data
            data = {
                "timestamp": datetime.now().isoformat(),
                "temperature": round(random.uniform(20, 30), 2),
                "humidity": round(random.uniform(40, 80), 2)
            }
            
            # Send to all subscribers
            dead_connections = set()
            for websocket in self.subscribers:
                try:
                    await websocket.send_text(json.dumps(data))
                except:
                    dead_connections.add(websocket)
            
            # Remove dead connections
            self.subscribers -= dead_connections
            await asyncio.sleep(1)

streamer = DataStreamer()

@app.websocket("/stream")
async def stream_endpoint(websocket: WebSocket):
    await websocket.accept()
    streamer.subscribe(websocket)
    
    try:
        while True:
            await websocket.receive_text()
    except WebSocketDisconnect:
        streamer.unsubscribe(websocket)

Best Practices

Key WebSocket implementation principles:

Connection Management:

  • Track active connections properly
  • Clean up dead connections automatically
  • Handle disconnections gracefully

Message Handling:

  • Use JSON for structured messages
  • Implement message types for different actions
  • Validate incoming messages

Error Handling:

  • Implement reconnection logic on client side
  • Use try-catch blocks around WebSocket operations
  • Log connection events for debugging

Performance:

  • Limit message history size
  • Remove inactive connections promptly
  • Use connection pooling for multiple clients

Summary

WebSocket implementation essentials:

  • Use FastAPI or similar frameworks for WebSocket servers
  • Implement proper connection management
  • Handle disconnections and errors gracefully
  • Use structured messages with JSON
  • Implement reconnection logic for robust clients
  • Clean up resources properly

WebSockets enable powerful real-time features in async applications, from chat systems to live data streaming.

In Part 19, we’ll explore health checks and monitoring.