Coverage for services/api-gateway/src/websocket_manager.py: 54%

70 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-25 16:18 +0000

1""" 

2WebSocket connection manager for broadcasting real-time updates to connected clients. 

3""" 

4 

5from typing import Set, Dict, Any 

6from fastapi import WebSocket 

7from datetime import datetime 

8import json 

9import asyncio 

10import logging 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15class ConnectionManager: 

16 """Manages WebSocket connections and broadcasts messages.""" 

17 

18 def __init__(self): 

19 # Active WebSocket connections 

20 self.active_connections: Set[WebSocket] = set() 

21 # Subscription tracking (event -> set of websockets) 

22 self.subscriptions: Dict[str, Set[WebSocket]] = {} 

23 

24 async def connect(self, websocket: WebSocket): 

25 """Accept and register a new WebSocket connection.""" 

26 await websocket.accept() 

27 self.active_connections.add(websocket) 

28 logger.info(f"✅ WebSocket client connected. Total: {len(self.active_connections)}") 

29 

30 def disconnect(self, websocket: WebSocket): 

31 """Remove a WebSocket connection.""" 

32 self.active_connections.discard(websocket) 

33 # Remove from all subscriptions 

34 for subscribers in self.subscriptions.values(): 

35 subscribers.discard(websocket) 

36 logger.info(f"❌ WebSocket client disconnected. Total: {len(self.active_connections)}") 

37 

38 async def send_message(self, websocket: WebSocket, message: Dict[str, Any]): 

39 """Send a message to a specific WebSocket.""" 

40 try: 

41 await websocket.send_json(message) 

42 except Exception as e: 

43 logger.error(f"Error sending message to websocket: {e}") 

44 self.disconnect(websocket) 

45 

46 async def broadcast(self, event: str, data: Any): 

47 """Broadcast a message to all connected clients.""" 

48 message = { 

49 "event": event, 

50 "data": data, 

51 "timestamp": datetime.utcnow().isoformat() 

52 } 

53 

54 # Send to all active connections 

55 disconnected = set() 

56 for connection in self.active_connections: 

57 try: 

58 await connection.send_json(message) 

59 except Exception as e: 

60 logger.error(f"Error broadcasting to websocket: {e}") 

61 disconnected.add(connection) 

62 

63 # Clean up disconnected clients 

64 for connection in disconnected: 

65 self.disconnect(connection) 

66 

67 logger.debug(f"📡 Broadcasted {event} to {len(self.active_connections)} clients") 

68 

69 async def broadcast_to_subscribers(self, event: str, data: Any): 

70 """Broadcast a message only to clients subscribed to a specific event.""" 

71 if event not in self.subscriptions: 

72 return 

73 

74 message = { 

75 "event": event, 

76 "data": data, 

77 "timestamp": datetime.utcnow().isoformat() 

78 } 

79 

80 subscribers = self.subscriptions[event].copy() 

81 disconnected = set() 

82 

83 for connection in subscribers: 

84 try: 

85 await connection.send_json(message) 

86 except Exception as e: 

87 logger.error(f"Error sending to subscriber: {e}") 

88 disconnected.add(connection) 

89 

90 # Clean up disconnected clients 

91 for connection in disconnected: 

92 self.disconnect(connection) 

93 

94 logger.debug(f"📡 Sent {event} to {len(subscribers)} subscribers") 

95 

96 def subscribe(self, websocket: WebSocket, event: str): 

97 """Subscribe a WebSocket to a specific event.""" 

98 if event not in self.subscriptions: 

99 self.subscriptions[event] = set() 

100 self.subscriptions[event].add(websocket) 

101 logger.debug(f"📝 Client subscribed to {event}") 

102 

103 def unsubscribe(self, websocket: WebSocket, event: str): 

104 """Unsubscribe a WebSocket from a specific event.""" 

105 if event in self.subscriptions: 

106 self.subscriptions[event].discard(websocket) 

107 logger.debug(f"📝 Client unsubscribed from {event}") 

108 

109 

110# Global connection manager instance 

111manager = ConnectionManager() 

112 

113 

114async def heartbeat_task(websocket: WebSocket): 

115 """Send periodic heartbeat to keep connection alive.""" 

116 try: 

117 while True: 

118 await asyncio.sleep(30) # Send heartbeat every 30 seconds 

119 await websocket.send_json({ 

120 "event": "pong", 

121 "data": {}, 

122 "timestamp": datetime.utcnow().isoformat() 

123 }) 

124 except Exception as e: 

125 logger.debug(f"Heartbeat task ended: {e}")