Coverage for services/api-gateway/src/websocket_manager.py: 54%
70 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
1"""
2WebSocket connection manager for broadcasting real-time updates to connected clients.
3"""
5from typing import Set, Dict, Any
6from fastapi import WebSocket
7from datetime import datetime
8import json
9import asyncio
10import logging
12logger = logging.getLogger(__name__)
15class ConnectionManager:
16 """Manages WebSocket connections and broadcasts messages."""
18 def __init__(self):
19 # Active WebSocket connections
20 self.active_connections: Set[WebSocket] = set()
21 # Subscription tracking (event -> set of websockets)
22 self.subscriptions: Dict[str, Set[WebSocket]] = {}
24 async def connect(self, websocket: WebSocket):
25 """Accept and register a new WebSocket connection."""
26 await websocket.accept()
27 self.active_connections.add(websocket)
28 logger.info(f"✅ WebSocket client connected. Total: {len(self.active_connections)}")
30 def disconnect(self, websocket: WebSocket):
31 """Remove a WebSocket connection."""
32 self.active_connections.discard(websocket)
33 # Remove from all subscriptions
34 for subscribers in self.subscriptions.values():
35 subscribers.discard(websocket)
36 logger.info(f"❌ WebSocket client disconnected. Total: {len(self.active_connections)}")
38 async def send_message(self, websocket: WebSocket, message: Dict[str, Any]):
39 """Send a message to a specific WebSocket."""
40 try:
41 await websocket.send_json(message)
42 except Exception as e:
43 logger.error(f"Error sending message to websocket: {e}")
44 self.disconnect(websocket)
46 async def broadcast(self, event: str, data: Any):
47 """Broadcast a message to all connected clients."""
48 message = {
49 "event": event,
50 "data": data,
51 "timestamp": datetime.utcnow().isoformat()
52 }
54 # Send to all active connections
55 disconnected = set()
56 for connection in self.active_connections:
57 try:
58 await connection.send_json(message)
59 except Exception as e:
60 logger.error(f"Error broadcasting to websocket: {e}")
61 disconnected.add(connection)
63 # Clean up disconnected clients
64 for connection in disconnected:
65 self.disconnect(connection)
67 logger.debug(f"📡 Broadcasted {event} to {len(self.active_connections)} clients")
69 async def broadcast_to_subscribers(self, event: str, data: Any):
70 """Broadcast a message only to clients subscribed to a specific event."""
71 if event not in self.subscriptions:
72 return
74 message = {
75 "event": event,
76 "data": data,
77 "timestamp": datetime.utcnow().isoformat()
78 }
80 subscribers = self.subscriptions[event].copy()
81 disconnected = set()
83 for connection in subscribers:
84 try:
85 await connection.send_json(message)
86 except Exception as e:
87 logger.error(f"Error sending to subscriber: {e}")
88 disconnected.add(connection)
90 # Clean up disconnected clients
91 for connection in disconnected:
92 self.disconnect(connection)
94 logger.debug(f"📡 Sent {event} to {len(subscribers)} subscribers")
96 def subscribe(self, websocket: WebSocket, event: str):
97 """Subscribe a WebSocket to a specific event."""
98 if event not in self.subscriptions:
99 self.subscriptions[event] = set()
100 self.subscriptions[event].add(websocket)
101 logger.debug(f"📝 Client subscribed to {event}")
103 def unsubscribe(self, websocket: WebSocket, event: str):
104 """Unsubscribe a WebSocket from a specific event."""
105 if event in self.subscriptions:
106 self.subscriptions[event].discard(websocket)
107 logger.debug(f"📝 Client unsubscribed from {event}")
110# Global connection manager instance
111manager = ConnectionManager()
114async def heartbeat_task(websocket: WebSocket):
115 """Send periodic heartbeat to keep connection alive."""
116 try:
117 while True:
118 await asyncio.sleep(30) # Send heartbeat every 30 seconds
119 await websocket.send_json({
120 "event": "pong",
121 "data": {},
122 "timestamp": datetime.utcnow().isoformat()
123 })
124 except Exception as e:
125 logger.debug(f"Heartbeat task ended: {e}")