Coverage for services/rf-acquisition/src/routers/acquisition.py: 48%

92 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-25 16:18 +0000

1"""FastAPI endpoints for RF acquisition.""" 

2 

3import logging 

4from datetime import datetime 

5from typing import Optional 

6from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks 

7from celery.result import AsyncResult 

8 

9from ..models.websdrs import ( 

10 AcquisitionRequest, 

11 AcquisitionTaskResponse, 

12 AcquisitionStatusResponse, 

13 WebSDRConfig, 

14) 

15from ..tasks.acquire_iq import acquire_iq, health_check_websdrs 

16 

17logger = logging.getLogger(__name__) 

18 

19router = APIRouter(prefix="/api/v1/acquisition", tags=["acquisition"]) 

20 

21 

22# WebSDRs - Northwestern Italy (Piedmont & Liguria regions) 

23# Source: WEBSDRS.md - Strategic network for triangulation in Northern Italy 

24DEFAULT_WEBSDRS = [ 

25 { 

26 "id": 1, 

27 "name": "Aquila di Giaveno", 

28 "url": "http://sdr1.ik1jns.it:8076/", 

29 "location_name": "Giaveno, Italy", 

30 "latitude": 45.02, 

31 "longitude": 7.29, 

32 "is_active": True, 

33 "timeout_seconds": 30, 

34 "retry_count": 3 

35 }, 

36 { 

37 "id": 2, 

38 "name": "Montanaro", 

39 "url": "http://cbfenis.ddns.net:43510/", 

40 "location_name": "Montanaro, Italy", 

41 "latitude": 45.234, 

42 "longitude": 7.857, 

43 "is_active": True, 

44 "timeout_seconds": 30, 

45 "retry_count": 3 

46 }, 

47 { 

48 "id": 3, 

49 "name": "Torino", 

50 "url": "http://vst-aero.it:8073/", 

51 "location_name": "Torino, Italy", 

52 "latitude": 45.044, 

53 "longitude": 7.672, 

54 "is_active": True, 

55 "timeout_seconds": 30, 

56 "retry_count": 3 

57 }, 

58 { 

59 "id": 4, 

60 "name": "Coazze", 

61 "url": "http://94.247.189.130:8076/", 

62 "location_name": "Coazze, Italy", 

63 "latitude": 45.03, 

64 "longitude": 7.27, 

65 "is_active": True, 

66 "timeout_seconds": 30, 

67 "retry_count": 3 

68 }, 

69 { 

70 "id": 5, 

71 "name": "Passo del Giovi", 

72 "url": "http://iz1mlt.ddns.net:8074/", 

73 "location_name": "Passo del Giovi, Italy", 

74 "latitude": 44.561, 

75 "longitude": 8.956, 

76 "is_active": True, 

77 "timeout_seconds": 30, 

78 "retry_count": 3 

79 }, 

80 { 

81 "id": 6, 

82 "name": "Genova", 

83 "url": "http://iq1zw.ddns.net:42154/", 

84 "location_name": "Genova, Italy", 

85 "latitude": 44.395, 

86 "longitude": 8.956, 

87 "is_active": True, 

88 "timeout_seconds": 30, 

89 "retry_count": 3 

90 }, 

91 { 

92 "id": 7, 

93 "name": "Milano - Baggio", 

94 "url": "http://iu2mch.duckdns.org:8073/", 

95 "location_name": "Milano (Baggio), Italy", 

96 "latitude": 45.478, 

97 "longitude": 9.123, 

98 "is_active": True, 

99 "timeout_seconds": 30, 

100 "retry_count": 3 

101 }, 

102] 

103 

104 

105def get_websdrs_config() -> list[dict]: 

106 """Get WebSDR configuration.""" 

107 # TODO: Load from database 

108 return DEFAULT_WEBSDRS 

109 

110 

111@router.post("/acquire", response_model=AcquisitionTaskResponse) 

112async def trigger_acquisition( 

113 request: AcquisitionRequest, 

114 background_tasks: BackgroundTasks, 

115): 

116 """ 

117 Trigger an IQ data acquisition from WebSDR receivers. 

118  

119 Returns: 

120 Task ID and initial status 

121 """ 

122 try: 

123 logger.info( 

124 "Triggering acquisition at %.2f MHz for %.1f seconds", 

125 request.frequency_mhz, 

126 request.duration_seconds 

127 ) 

128 

129 # Get WebSDR configs 

130 websdrs_config = get_websdrs_config() 

131 

132 # Filter to requested receivers if specified 

133 if request.websdrs: 

134 websdrs_config = [ 

135 w for w in websdrs_config if w['id'] in request.websdrs 

136 ] 

137 

138 if not websdrs_config: 

139 raise HTTPException( 

140 status_code=400, 

141 detail="No active WebSDRs available for acquisition" 

142 ) 

143 

144 # Queue Celery task 

145 task = acquire_iq.delay( 

146 frequency_mhz=request.frequency_mhz, 

147 duration_seconds=request.duration_seconds, 

148 start_time_iso=request.start_time.isoformat(), 

149 websdrs_config_list=websdrs_config, 

150 sample_rate_khz=12.5, 

151 ) 

152 

153 logger.info("Queued acquisition task: %s", task.id) 

154 

155 return AcquisitionTaskResponse( 

156 task_id=str(task.id), 

157 status="PENDING", 

158 message=f"Acquisition task queued for {len(websdrs_config)} WebSDR receivers", 

159 frequency_mhz=request.frequency_mhz, 

160 websdrs_count=len(websdrs_config) 

161 ) 

162 

163 except Exception as e: 

164 logger.exception("Error triggering acquisition: %s", str(e)) 

165 raise HTTPException( 

166 status_code=500, 

167 detail=f"Error triggering acquisition: {str(e)}" 

168 ) 

169 

170 

171@router.get("/status/{task_id}", response_model=AcquisitionStatusResponse) 

172async def get_acquisition_status(task_id: str): 

173 """ 

174 Get status of an ongoing acquisition task. 

175  

176 Args: 

177 task_id: Celery task ID 

178  

179 Returns: 

180 Task status and progress information 

181 """ 

182 try: 

183 result = AsyncResult(task_id) 

184 

185 status_map = { 

186 'PENDING': 'PENDING', 

187 'STARTED': 'PROGRESS', 

188 'PROGRESS': 'PROGRESS', 

189 'SUCCESS': 'SUCCESS', 

190 'FAILURE': 'FAILURE', 

191 'REVOKED': 'REVOKED', 

192 'RETRY': 'PROGRESS', 

193 } 

194 

195 mapped_status = status_map.get(result.state, result.state) 

196 

197 # Extract progress info 

198 if result.state == 'PROGRESS': 

199 info = result.info if isinstance(result.info, dict) else {} 

200 progress = info.get('progress', 0) 

201 message = info.get('status', 'Processing...') 

202 measurements_collected = info.get('successful', 0) 

203 elif result.state == 'SUCCESS': 

204 info = result.result if isinstance(result.result, dict) else {} 

205 progress = 100 

206 message = "Acquisition complete" 

207 measurements_collected = info.get('measurements_count', 0) 

208 else: 

209 info = {} 

210 progress = 0 if result.state == 'PENDING' else 50 

211 message = f"Task state: {result.state}" 

212 measurements_collected = 0 

213 

214 return AcquisitionStatusResponse( 

215 task_id=task_id, 

216 status=mapped_status, 

217 progress=progress, 

218 message=message, 

219 measurements_collected=measurements_collected, 

220 errors=info.get('errors', None), 

221 result=result.result if result.state == 'SUCCESS' else None 

222 ) 

223 

224 except Exception as e: 

225 logger.exception("Error getting acquisition status: %s", str(e)) 

226 raise HTTPException( 

227 status_code=500, 

228 detail=f"Error retrieving status: {str(e)}" 

229 ) 

230 

231 

232@router.get("/websdrs", response_model=list[dict]) 

233async def list_websdrs(): 

234 """ 

235 List all configured WebSDR receivers. 

236  

237 Returns: 

238 List of WebSDR configurations 

239 """ 

240 return get_websdrs_config() 

241 

242 

243@router.get("/websdrs/health") 

244async def check_websdrs_health(): 

245 """ 

246 Check health status of all WebSDR receivers with metrics. 

247  

248 Returns: 

249 Dict mapping WebSDR ID to health status with SNR, uptime, last contact 

250 """ 

251 try: 

252 from ..storage.db_manager import DatabaseManager 

253 from ..tasks.uptime_monitor import calculate_uptime_percentage 

254 

255 # Get WebSDR configs 

256 websdrs_config = get_websdrs_config() 

257 logger.debug(f"WebSDR configs loaded: {len(websdrs_config)} SDRs") 

258 

259 # Run health check task 

260 task = health_check_websdrs.delay() 

261 logger.debug(f"Health check task submitted: {task.id}") 

262 

263 result = task.get(timeout=60) 

264 logger.info(f"Health check task result: {result}") 

265 

266 # Get SNR statistics from database 

267 db_manager = DatabaseManager() 

268 snr_stats = db_manager.get_snr_statistics() if db_manager else {} 

269 logger.debug(f"SNR statistics retrieved: {len(snr_stats) if snr_stats else 0} entries") 

270 

271 # Format response with detailed status information 

272 health_status = {} 

273 check_time = datetime.utcnow().isoformat() 

274 

275 for ws_config in websdrs_config: 

276 ws_id = ws_config['id'] 

277 # IMPORTANT: Celery serializes dict keys as strings! 

278 is_online = result.get(str(ws_id), False) 

279 

280 # Get SNR and uptime from stats 

281 sdr_stats = snr_stats.get(ws_id, {}) if snr_stats else {} 

282 avg_snr = sdr_stats.get('avg_snr_db', None) 

283 

284 # Calculate uptime from database history (last 24 hours) 

285 uptime = calculate_uptime_percentage(ws_id, hours=24) 

286 

287 health_status[ws_id] = { 

288 'websdr_id': ws_id, 

289 'name': ws_config['name'], 

290 'status': 'online' if is_online else 'offline', 

291 'last_check': check_time, 

292 'uptime': round(uptime, 1), 

293 'avg_snr': round(avg_snr, 2) if avg_snr is not None else None, 

294 } 

295 

296 if not is_online: 

297 health_status[ws_id]['error_message'] = 'Health check failed or timed out' 

298 

299 logger.info(f"Health check response ready with metrics") 

300 return health_status 

301 

302 except Exception as e: 

303 logger.exception("Error checking WebSDR health: %s", str(e)) 

304 

305 # Return offline status for all WebSDRs on error 

306 websdrs_config = get_websdrs_config() 

307 check_time = datetime.utcnow().isoformat() 

308 

309 health_status = {} 

310 for ws_config in websdrs_config: 

311 health_status[ws_config['id']] = { 

312 'websdr_id': ws_config['id'], 

313 'name': ws_config['name'], 

314 'status': 'offline', 

315 'last_check': check_time, 

316 'error_message': f'Health check error: {str(e)}' 

317 } 

318 

319 return health_status 

320 

321 

322@router.get("/config") 

323async def get_configuration(): 

324 """ 

325 Get acquisition service configuration. 

326  

327 Returns: 

328 Service configuration details 

329 """ 

330 return { 

331 'service': 'rf-acquisition', 

332 'version': '0.1.0', 

333 'capabilities': [ 

334 'simultaneous-acquisition', 

335 'iq-processing', 

336 'signal-metrics' 

337 ], 

338 'websdrs_count': len(get_websdrs_config()), 

339 'max_duration_seconds': 300, 

340 'default_sample_rate_khz': 12.5, 

341 }