Coverage for services/rf-acquisition/src/tasks/uptime_monitor.py: 18%

94 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-25 16:18 +0000

1"""Monitor WebSDR uptime and record to database.""" 

2 

3import logging 

4from datetime import datetime 

5from typing import List, Dict 

6import asyncio 

7 

8from celery import shared_task 

9from sqlalchemy import insert 

10 

11try: 

12 from ..config import settings 

13 from ..storage.db_manager import DatabaseManager 

14 from ..fetchers.websdr_fetcher import WebSDRFetcher 

15 from ..models.db import Base 

16except ImportError: 

17 # Fallback for direct execution 

18 from config import settings 

19 from storage.db_manager import DatabaseManager 

20 from fetchers.websdr_fetcher import WebSDRFetcher 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25# Define websdrs_uptime_history table model (raw SQL approach) 

26UPTIME_HISTORY_TABLE = """ 

27CREATE TABLE IF NOT EXISTS heimdall.websdrs_uptime_history ( 

28 id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), 

29 websdr_id INT NOT NULL, 

30 websdr_name VARCHAR(255), 

31 status VARCHAR(20) NOT NULL CHECK (status IN ('online', 'offline')), 

32 timestamp TIMESTAMP WITH TIME ZONE NOT NULL, 

33 created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() 

34); 

35 

36SELECT create_hypertable( 

37 'heimdall.websdrs_uptime_history', 

38 'timestamp', 

39 if_not_exists => TRUE 

40); 

41 

42CREATE INDEX IF NOT EXISTS idx_websdrs_uptime_history_websdr_time  

43ON heimdall.websdrs_uptime_history(websdr_id, timestamp DESC); 

44""" 

45 

46 

47def get_default_websdrs() -> List[Dict]: 

48 """Get default WebSDR configuration.""" 

49 return [ 

50 { 

51 "id": 1, 

52 "name": "Aquila di Giaveno", 

53 "url": "http://sdr1.ik1jns.it:8076/", 

54 "location_name": "Giaveno, Italy", 

55 "latitude": 45.02, 

56 "longitude": 7.29, 

57 "is_active": True, 

58 "timeout_seconds": 3, 

59 "retry_count": 1 

60 }, 

61 { 

62 "id": 2, 

63 "name": "Montanaro", 

64 "url": "http://cbfenis.ddns.net:43510/", 

65 "location_name": "Montanaro, Italy", 

66 "latitude": 45.234, 

67 "longitude": 7.857, 

68 "is_active": True, 

69 "timeout_seconds": 3, 

70 "retry_count": 1 

71 }, 

72 { 

73 "id": 3, 

74 "name": "Torino", 

75 "url": "http://vst-aero.it:8073/", 

76 "location_name": "Torino, Italy", 

77 "latitude": 45.044, 

78 "longitude": 7.672, 

79 "is_active": True, 

80 "timeout_seconds": 3, 

81 "retry_count": 1 

82 }, 

83 { 

84 "id": 4, 

85 "name": "Coazze", 

86 "url": "http://94.247.189.130:8076/", 

87 "location_name": "Coazze, Italy", 

88 "latitude": 45.03, 

89 "longitude": 7.27, 

90 "is_active": True, 

91 "timeout_seconds": 3, 

92 "retry_count": 1 

93 }, 

94 { 

95 "id": 5, 

96 "name": "Passo del Giovi", 

97 "url": "http://iz1mlt.ddns.net:8074/", 

98 "location_name": "Passo del Giovi, Italy", 

99 "latitude": 44.561, 

100 "longitude": 8.956, 

101 "is_active": True, 

102 "timeout_seconds": 3, 

103 "retry_count": 1 

104 }, 

105 { 

106 "id": 6, 

107 "name": "Genova", 

108 "url": "http://iq1zw.ddns.net:42154/", 

109 "location_name": "Genova, Italy", 

110 "latitude": 44.395, 

111 "longitude": 8.956, 

112 "is_active": True, 

113 "timeout_seconds": 3, 

114 "retry_count": 1 

115 }, 

116 { 

117 "id": 7, 

118 "name": "Milano - Baggio", 

119 "url": "http://iu2mch.duckdns.org:8073/", 

120 "location_name": "Milano (Baggio), Italy", 

121 "latitude": 45.478, 

122 "longitude": 9.123, 

123 "is_active": True, 

124 "timeout_seconds": 3, 

125 "retry_count": 1 

126 }, 

127 ] 

128 

129 

130@shared_task(bind=True, name='monitor_websdrs_uptime') 

131def monitor_websdrs_uptime(self): 

132 """ 

133 Celery task: Check WebSDR health and record status to database. 

134 Runs periodically (every minute) to build uptime history. 

135 """ 

136 try: 

137 logger.info("Starting WebSDR uptime monitoring task") 

138 

139 # Get WebSDR configs as dict 

140 websdrs = get_default_websdrs() 

141 

142 # Direct health check without WebSDRFetcher (to avoid object attribute issues) 

143 logger.info(f"Checking health of {len(websdrs)} WebSDRs") 

144 import aiohttp 

145 

146 async def direct_health_check(): 

147 """Direct health check without using WebSDRFetcher.""" 

148 results = {} 

149 timeout = aiohttp.ClientTimeout(total=3) 

150 

151 async with aiohttp.ClientSession(timeout=timeout) as session: 

152 for ws in websdrs: 

153 ws_id = ws['id'] 

154 ws_url = ws['url'] 

155 ws_name = ws['name'] 

156 

157 try: 

158 async with session.head(ws_url, timeout=timeout, allow_redirects=False) as resp: 

159 # 501 means HEAD not supported, try GET 

160 if resp.status == 501: 

161 async with session.get(ws_url, timeout=timeout, allow_redirects=False) as resp2: 

162 is_online = 200 <= resp2.status < 400 

163 logger.debug(f" {ws_name} ({ws_id}): GET {resp2.status} → {'online' if is_online else 'offline'}") 

164 results[ws_id] = is_online 

165 else: 

166 is_online = 200 <= resp.status < 400 

167 logger.debug(f" {ws_name} ({ws_id}): HEAD {resp.status} → {'online' if is_online else 'offline'}") 

168 results[ws_id] = is_online 

169 except Exception as e: 

170 logger.warning(f" {ws_name} ({ws_id}): Exception {type(e).__name__} → offline") 

171 results[ws_id] = False 

172 

173 return results 

174 

175 # Run async health check 

176 loop = asyncio.get_event_loop() 

177 health_results = loop.run_until_complete(direct_health_check()) 

178 

179 logger.info(f"Health check results: {health_results}") 

180 

181 # Get database connection 

182 db_manager = DatabaseManager() 

183 

184 # Record status for each WebSDR 

185 records_inserted = 0 

186 with db_manager.get_session() as session: 

187 for ws_config in websdrs: 

188 ws_id = ws_config['id'] 

189 ws_name = ws_config['name'] 

190 

191 # Get online/offline status 

192 is_online = health_results.get(ws_id, False) 

193 status = 'online' if is_online else 'offline' 

194 timestamp = datetime.utcnow() 

195 

196 # Insert into websdrs_uptime_history 

197 try: 

198 from sqlalchemy import text 

199 

200 # Raw SQL insert for uptime history using text() 

201 query = text(""" 

202 INSERT INTO heimdall.websdrs_uptime_history  

203 (websdr_id, websdr_name, status, timestamp) 

204 VALUES (:websdr_id, :websdr_name, :status, :timestamp) 

205 """) 

206 session.execute( 

207 query, 

208 { 

209 'websdr_id': ws_id, 

210 'websdr_name': ws_name, 

211 'status': status, 

212 'timestamp': timestamp 

213 } 

214 ) 

215 records_inserted += 1 

216 logger.debug(f"Recorded {ws_name} ({ws_id}): {status}") 

217 except Exception as e: 

218 logger.error(f"Failed to record uptime for {ws_name}: {e}") 

219 

220 session.commit() 

221 

222 logger.info(f"Uptime monitoring complete: {records_inserted} records inserted") 

223 

224 return { 

225 'status': 'success', 

226 'records_inserted': records_inserted, 

227 'timestamp': datetime.utcnow().isoformat() 

228 } 

229 

230 except Exception as e: 

231 logger.exception(f"Error in uptime monitoring: {e}") 

232 raise 

233 

234 

235def calculate_uptime_percentage(websdr_id: int, hours: int = 24) -> float: 

236 """ 

237 Calculate uptime percentage for a WebSDR over the last N hours. 

238  

239 Args: 

240 websdr_id: ID of the WebSDR 

241 hours: Time window in hours (default: 24) 

242  

243 Returns: 

244 Uptime percentage (0-100) 

245 """ 

246 try: 

247 from datetime import timedelta 

248 from sqlalchemy import text 

249 

250 db_manager = DatabaseManager() 

251 

252 with db_manager.get_session() as session: 

253 # Query uptime history for the time window 

254 cutoff_time = datetime.utcnow() - timedelta(hours=hours) 

255 

256 # Raw SQL query using SQLAlchemy text() 

257 query = text(""" 

258 SELECT status, COUNT(*) as count 

259 FROM heimdall.websdrs_uptime_history 

260 WHERE websdr_id = :websdr_id 

261 AND timestamp >= :cutoff_time 

262 GROUP BY status 

263 """) 

264 

265 results = session.execute( 

266 query, 

267 {'websdr_id': websdr_id, 'cutoff_time': cutoff_time} 

268 ) 

269 

270 status_counts = {row[0]: row[1] for row in results} 

271 

272 total_checks = sum(status_counts.values()) 

273 if total_checks == 0: 

274 return 0.0 

275 

276 online_checks = status_counts.get('online', 0) 

277 uptime_pct = (online_checks / total_checks) * 100 

278 

279 logger.debug( 

280 f"SDR {websdr_id}: {online_checks}/{total_checks} online over {hours}h = {uptime_pct:.1f}%" 

281 ) 

282 

283 return uptime_pct 

284 

285 except Exception as e: 

286 logger.error(f"Error calculating uptime for SDR {websdr_id}: {e}") 

287 return 0.0