Coverage for services/rf-acquisition/src/routers/acquisition.py: 48%
92 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
1"""FastAPI endpoints for RF acquisition."""
3import logging
4from datetime import datetime
5from typing import Optional
6from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
7from celery.result import AsyncResult
9from ..models.websdrs import (
10 AcquisitionRequest,
11 AcquisitionTaskResponse,
12 AcquisitionStatusResponse,
13 WebSDRConfig,
14)
15from ..tasks.acquire_iq import acquire_iq, health_check_websdrs
17logger = logging.getLogger(__name__)
19router = APIRouter(prefix="/api/v1/acquisition", tags=["acquisition"])
22# WebSDRs - Northwestern Italy (Piedmont & Liguria regions)
23# Source: WEBSDRS.md - Strategic network for triangulation in Northern Italy
24DEFAULT_WEBSDRS = [
25 {
26 "id": 1,
27 "name": "Aquila di Giaveno",
28 "url": "http://sdr1.ik1jns.it:8076/",
29 "location_name": "Giaveno, Italy",
30 "latitude": 45.02,
31 "longitude": 7.29,
32 "is_active": True,
33 "timeout_seconds": 30,
34 "retry_count": 3
35 },
36 {
37 "id": 2,
38 "name": "Montanaro",
39 "url": "http://cbfenis.ddns.net:43510/",
40 "location_name": "Montanaro, Italy",
41 "latitude": 45.234,
42 "longitude": 7.857,
43 "is_active": True,
44 "timeout_seconds": 30,
45 "retry_count": 3
46 },
47 {
48 "id": 3,
49 "name": "Torino",
50 "url": "http://vst-aero.it:8073/",
51 "location_name": "Torino, Italy",
52 "latitude": 45.044,
53 "longitude": 7.672,
54 "is_active": True,
55 "timeout_seconds": 30,
56 "retry_count": 3
57 },
58 {
59 "id": 4,
60 "name": "Coazze",
61 "url": "http://94.247.189.130:8076/",
62 "location_name": "Coazze, Italy",
63 "latitude": 45.03,
64 "longitude": 7.27,
65 "is_active": True,
66 "timeout_seconds": 30,
67 "retry_count": 3
68 },
69 {
70 "id": 5,
71 "name": "Passo del Giovi",
72 "url": "http://iz1mlt.ddns.net:8074/",
73 "location_name": "Passo del Giovi, Italy",
74 "latitude": 44.561,
75 "longitude": 8.956,
76 "is_active": True,
77 "timeout_seconds": 30,
78 "retry_count": 3
79 },
80 {
81 "id": 6,
82 "name": "Genova",
83 "url": "http://iq1zw.ddns.net:42154/",
84 "location_name": "Genova, Italy",
85 "latitude": 44.395,
86 "longitude": 8.956,
87 "is_active": True,
88 "timeout_seconds": 30,
89 "retry_count": 3
90 },
91 {
92 "id": 7,
93 "name": "Milano - Baggio",
94 "url": "http://iu2mch.duckdns.org:8073/",
95 "location_name": "Milano (Baggio), Italy",
96 "latitude": 45.478,
97 "longitude": 9.123,
98 "is_active": True,
99 "timeout_seconds": 30,
100 "retry_count": 3
101 },
102]
105def get_websdrs_config() -> list[dict]:
106 """Get WebSDR configuration."""
107 # TODO: Load from database
108 return DEFAULT_WEBSDRS
111@router.post("/acquire", response_model=AcquisitionTaskResponse)
112async def trigger_acquisition(
113 request: AcquisitionRequest,
114 background_tasks: BackgroundTasks,
115):
116 """
117 Trigger an IQ data acquisition from WebSDR receivers.
119 Returns:
120 Task ID and initial status
121 """
122 try:
123 logger.info(
124 "Triggering acquisition at %.2f MHz for %.1f seconds",
125 request.frequency_mhz,
126 request.duration_seconds
127 )
129 # Get WebSDR configs
130 websdrs_config = get_websdrs_config()
132 # Filter to requested receivers if specified
133 if request.websdrs:
134 websdrs_config = [
135 w for w in websdrs_config if w['id'] in request.websdrs
136 ]
138 if not websdrs_config:
139 raise HTTPException(
140 status_code=400,
141 detail="No active WebSDRs available for acquisition"
142 )
144 # Queue Celery task
145 task = acquire_iq.delay(
146 frequency_mhz=request.frequency_mhz,
147 duration_seconds=request.duration_seconds,
148 start_time_iso=request.start_time.isoformat(),
149 websdrs_config_list=websdrs_config,
150 sample_rate_khz=12.5,
151 )
153 logger.info("Queued acquisition task: %s", task.id)
155 return AcquisitionTaskResponse(
156 task_id=str(task.id),
157 status="PENDING",
158 message=f"Acquisition task queued for {len(websdrs_config)} WebSDR receivers",
159 frequency_mhz=request.frequency_mhz,
160 websdrs_count=len(websdrs_config)
161 )
163 except Exception as e:
164 logger.exception("Error triggering acquisition: %s", str(e))
165 raise HTTPException(
166 status_code=500,
167 detail=f"Error triggering acquisition: {str(e)}"
168 )
171@router.get("/status/{task_id}", response_model=AcquisitionStatusResponse)
172async def get_acquisition_status(task_id: str):
173 """
174 Get status of an ongoing acquisition task.
176 Args:
177 task_id: Celery task ID
179 Returns:
180 Task status and progress information
181 """
182 try:
183 result = AsyncResult(task_id)
185 status_map = {
186 'PENDING': 'PENDING',
187 'STARTED': 'PROGRESS',
188 'PROGRESS': 'PROGRESS',
189 'SUCCESS': 'SUCCESS',
190 'FAILURE': 'FAILURE',
191 'REVOKED': 'REVOKED',
192 'RETRY': 'PROGRESS',
193 }
195 mapped_status = status_map.get(result.state, result.state)
197 # Extract progress info
198 if result.state == 'PROGRESS':
199 info = result.info if isinstance(result.info, dict) else {}
200 progress = info.get('progress', 0)
201 message = info.get('status', 'Processing...')
202 measurements_collected = info.get('successful', 0)
203 elif result.state == 'SUCCESS':
204 info = result.result if isinstance(result.result, dict) else {}
205 progress = 100
206 message = "Acquisition complete"
207 measurements_collected = info.get('measurements_count', 0)
208 else:
209 info = {}
210 progress = 0 if result.state == 'PENDING' else 50
211 message = f"Task state: {result.state}"
212 measurements_collected = 0
214 return AcquisitionStatusResponse(
215 task_id=task_id,
216 status=mapped_status,
217 progress=progress,
218 message=message,
219 measurements_collected=measurements_collected,
220 errors=info.get('errors', None),
221 result=result.result if result.state == 'SUCCESS' else None
222 )
224 except Exception as e:
225 logger.exception("Error getting acquisition status: %s", str(e))
226 raise HTTPException(
227 status_code=500,
228 detail=f"Error retrieving status: {str(e)}"
229 )
232@router.get("/websdrs", response_model=list[dict])
233async def list_websdrs():
234 """
235 List all configured WebSDR receivers.
237 Returns:
238 List of WebSDR configurations
239 """
240 return get_websdrs_config()
243@router.get("/websdrs/health")
244async def check_websdrs_health():
245 """
246 Check health status of all WebSDR receivers with metrics.
248 Returns:
249 Dict mapping WebSDR ID to health status with SNR, uptime, last contact
250 """
251 try:
252 from ..storage.db_manager import DatabaseManager
253 from ..tasks.uptime_monitor import calculate_uptime_percentage
255 # Get WebSDR configs
256 websdrs_config = get_websdrs_config()
257 logger.debug(f"WebSDR configs loaded: {len(websdrs_config)} SDRs")
259 # Run health check task
260 task = health_check_websdrs.delay()
261 logger.debug(f"Health check task submitted: {task.id}")
263 result = task.get(timeout=60)
264 logger.info(f"Health check task result: {result}")
266 # Get SNR statistics from database
267 db_manager = DatabaseManager()
268 snr_stats = db_manager.get_snr_statistics() if db_manager else {}
269 logger.debug(f"SNR statistics retrieved: {len(snr_stats) if snr_stats else 0} entries")
271 # Format response with detailed status information
272 health_status = {}
273 check_time = datetime.utcnow().isoformat()
275 for ws_config in websdrs_config:
276 ws_id = ws_config['id']
277 # IMPORTANT: Celery serializes dict keys as strings!
278 is_online = result.get(str(ws_id), False)
280 # Get SNR and uptime from stats
281 sdr_stats = snr_stats.get(ws_id, {}) if snr_stats else {}
282 avg_snr = sdr_stats.get('avg_snr_db', None)
284 # Calculate uptime from database history (last 24 hours)
285 uptime = calculate_uptime_percentage(ws_id, hours=24)
287 health_status[ws_id] = {
288 'websdr_id': ws_id,
289 'name': ws_config['name'],
290 'status': 'online' if is_online else 'offline',
291 'last_check': check_time,
292 'uptime': round(uptime, 1),
293 'avg_snr': round(avg_snr, 2) if avg_snr is not None else None,
294 }
296 if not is_online:
297 health_status[ws_id]['error_message'] = 'Health check failed or timed out'
299 logger.info(f"Health check response ready with metrics")
300 return health_status
302 except Exception as e:
303 logger.exception("Error checking WebSDR health: %s", str(e))
305 # Return offline status for all WebSDRs on error
306 websdrs_config = get_websdrs_config()
307 check_time = datetime.utcnow().isoformat()
309 health_status = {}
310 for ws_config in websdrs_config:
311 health_status[ws_config['id']] = {
312 'websdr_id': ws_config['id'],
313 'name': ws_config['name'],
314 'status': 'offline',
315 'last_check': check_time,
316 'error_message': f'Health check error: {str(e)}'
317 }
319 return health_status
322@router.get("/config")
323async def get_configuration():
324 """
325 Get acquisition service configuration.
327 Returns:
328 Service configuration details
329 """
330 return {
331 'service': 'rf-acquisition',
332 'version': '0.1.0',
333 'capabilities': [
334 'simultaneous-acquisition',
335 'iq-processing',
336 'signal-metrics'
337 ],
338 'websdrs_count': len(get_websdrs_config()),
339 'max_duration_seconds': 300,
340 'default_sample_rate_khz': 12.5,
341 }