Coverage for services/rf-acquisition/src/tasks/acquire_iq.py: 19%
145 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
1"""Celery tasks for IQ data acquisition."""
3import logging
4import json
5from datetime import datetime
6from typing import Dict, List, Optional
7from celery import shared_task, Task
8from celery.utils.log import get_task_logger
9import numpy as np
11from ..models.websdrs import WebSDRConfig, MeasurementRecord, AcquisitionRequest
12from ..fetchers.websdr_fetcher import WebSDRFetcher
13from ..processors.iq_processor import IQProcessor
14from ..storage.minio_client import MinIOClient
15from ..config import settings
17logger = get_task_logger(__name__)
20class AcquisitionTask(Task):
21 """Base task for acquisitions."""
23 autoretry_for = (Exception,)
24 retry_kwargs = {'max_retries': 3}
25 retry_backoff = True
26 retry_backoff_max = 600
27 retry_jitter = True
30@shared_task(bind=True, base=AcquisitionTask)
31def acquire_iq(
32 self,
33 frequency_mhz: float,
34 duration_seconds: float,
35 start_time_iso: str,
36 websdrs_config_list: List[Dict],
37 sample_rate_khz: float = 12.5,
38):
39 """
40 Acquire IQ data from multiple WebSDR receivers simultaneously.
42 This task:
43 1. Fetches IQ from 7 WebSDR simultaneously
44 2. Processes signal metrics
45 3. Saves to MinIO (via separate storage task)
46 4. Writes metadata to TimescaleDB (via separate DB task)
48 Args:
49 frequency_mhz: Target frequency in MHz
50 duration_seconds: Acquisition duration in seconds
51 start_time_iso: Start time in ISO format
52 websdrs_config_list: List of WebSDR config dicts
53 sample_rate_khz: Sample rate in kHz
55 Returns:
56 Dict with acquisition results
57 """
58 try:
59 self.update_state(
60 state='PROGRESS',
61 meta={
62 'current': 0,
63 'total': len(websdrs_config_list),
64 'status': 'Initializing fetcher...'
65 }
66 )
68 # Parse WebSDR configs
69 websdrs = [
70 WebSDRConfig(**cfg) for cfg in websdrs_config_list
71 ]
73 logger.info(
74 "Starting acquisition task from %d WebSDRs at %.2f MHz for %.1f seconds",
75 len(websdrs),
76 frequency_mhz,
77 duration_seconds
78 )
80 # Fetch IQ data
81 measurements = []
82 errors = []
84 import asyncio
86 async def fetch_and_process():
87 async with WebSDRFetcher(
88 websdrs=websdrs,
89 timeout=30,
90 retry_count=3,
91 concurrent_limit=7
92 ) as fetcher:
93 # Update state: fetching
94 self.update_state(
95 state='PROGRESS',
96 meta={
97 'current': 0,
98 'total': len(websdrs),
99 'status': f'Fetching IQ from {len(websdrs)} WebSDRs...'
100 }
101 )
103 # Fetch all simultaneously
104 iq_data_dict = await fetcher.fetch_iq_simultaneous(
105 frequency_mhz=frequency_mhz,
106 duration_seconds=duration_seconds,
107 sample_rate_khz=sample_rate_khz
108 )
110 # Process results
111 successful = 0
112 for idx, (websdr_id, (iq_data, error)) in enumerate(iq_data_dict.items()):
113 if error:
114 errors.append(f"WebSDR {websdr_id}: {error}")
115 logger.warning("WebSDR %d acquisition failed: %s", websdr_id, error)
116 else:
117 try:
118 # Compute metrics
119 metrics = IQProcessor.compute_metrics(
120 iq_data=iq_data,
121 sample_rate_hz=int(sample_rate_khz * 1e3),
122 target_frequency_hz=int(frequency_mhz * 1e6),
123 noise_bandwidth_hz=10000
124 )
126 # Create measurement record
127 measurement = {
128 'websdr_id': websdr_id,
129 'frequency_mhz': frequency_mhz,
130 'sample_rate_khz': sample_rate_khz,
131 'samples_count': len(iq_data),
132 'timestamp_utc': datetime.utcnow().isoformat(),
133 'metrics': metrics.dict(),
134 'iq_data_path': f's3://heimdall-raw-iq/sessions/{self.request.id}/websdr_{websdr_id}.npy',
135 'iq_data': iq_data.tolist(), # For now, store in memory
136 }
137 measurements.append(measurement)
138 successful += 1
140 logger.info(
141 "Processed WebSDR %d - SNR: %.2f dB, Offset: %.2f Hz",
142 websdr_id,
143 metrics.snr_db,
144 metrics.frequency_offset_hz
145 )
146 except Exception as e:
147 error_msg = f"Processing error for WebSDR {websdr_id}: {str(e)}"
148 errors.append(error_msg)
149 logger.exception(error_msg)
151 # Update progress
152 progress = ((idx + 1) / len(websdrs)) * 100
153 self.update_state(
154 state='PROGRESS',
155 meta={
156 'current': idx + 1,
157 'total': len(websdrs),
158 'successful': successful,
159 'status': f'Processing {idx + 1}/{len(websdrs)} measurements...',
160 'progress': progress
161 }
162 )
164 return measurements, errors
166 # Run async function
167 try:
168 loop = asyncio.get_event_loop()
169 except RuntimeError:
170 loop = asyncio.new_event_loop()
171 asyncio.set_event_loop(loop)
173 measurements, errors = loop.run_until_complete(fetch_and_process())
175 result = {
176 'task_id': self.request.id,
177 'status': 'SUCCESS' if measurements else 'PARTIAL_FAILURE',
178 'measurements': measurements,
179 'measurements_count': len(measurements),
180 'errors': errors,
181 'start_time': start_time_iso,
182 'end_time': datetime.utcnow().isoformat(),
183 'frequency_mhz': frequency_mhz,
184 'duration_seconds': duration_seconds,
185 }
187 logger.info(
188 "Acquisition complete: %d successful, %d errors",
189 len(measurements),
190 len(errors)
191 )
193 return result
195 except Exception as e:
196 logger.exception("Acquisition task failed: %s", str(e))
197 raise
200@shared_task(bind=True)
201def save_measurements_to_minio(
202 self,
203 task_id: str,
204 measurements: List[Dict],
205):
206 """
207 Save IQ data from measurements to MinIO.
209 This task:
210 1. Initializes MinIO client
211 2. Stores each measurement's IQ data as .npy file
212 3. Stores associated metadata as JSON
213 4. Returns S3 paths for all stored files
215 Args:
216 task_id: Parent task ID (session ID)
217 measurements: List of measurement records with 'iq_data' and 'websdr_id'
219 Returns:
220 Dict with storage results
221 """
222 try:
223 logger.info("Saving %d measurements to MinIO...", len(measurements))
225 # Initialize MinIO client
226 minio_client = MinIOClient(
227 endpoint_url=settings.minio_url,
228 access_key=settings.minio_access_key,
229 secret_key=settings.minio_secret_key,
230 bucket_name=settings.minio_bucket_raw_iq,
231 )
233 # Ensure bucket exists
234 if not minio_client.ensure_bucket_exists():
235 logger.error("Failed to access MinIO bucket")
236 return {
237 'status': 'FAILED',
238 'message': 'Failed to access MinIO bucket',
239 'measurements_count': 0,
240 'successful': 0,
241 }
243 stored_measurements = []
244 failed_measurements = []
246 for idx, measurement in enumerate(measurements):
247 try:
248 websdr_id = measurement.get('websdr_id')
249 iq_data = measurement.get('iq_data')
251 # Validate required fields
252 if websdr_id is None:
253 logger.error("Missing websdr_id in measurement at index %d", idx)
254 failed_measurements.append({
255 'websdr_id': None,
256 'error': 'Missing websdr_id',
257 'status': 'FAILED',
258 })
259 continue
261 if iq_data is None:
262 logger.error("Missing iq_data for WebSDR %d", websdr_id)
263 failed_measurements.append({
264 'websdr_id': websdr_id,
265 'error': 'Missing iq_data',
266 'status': 'FAILED',
267 })
268 continue
270 # Convert IQ data list to numpy array if needed
271 if isinstance(iq_data, list):
272 iq_array = np.array(iq_data, dtype=np.complex64)
273 else:
274 iq_array = np.asarray(iq_data, dtype=np.complex64)
276 # Prepare metadata (exclude iq_data to avoid duplication)
277 metadata = {
278 k: v for k, v in measurement.items()
279 if k not in ['iq_data', 'iq_data_path']
280 }
282 # Upload to MinIO
283 success, result = minio_client.upload_iq_data(
284 iq_data=iq_array,
285 task_id=task_id,
286 websdr_id=int(websdr_id),
287 metadata=metadata,
288 )
290 if success:
291 stored_measurements.append({
292 'websdr_id': websdr_id,
293 's3_path': result,
294 'samples_count': len(iq_array),
295 'status': 'SUCCESS',
296 })
297 logger.info(
298 "Saved WebSDR %d measurement to %s",
299 websdr_id,
300 result
301 )
302 else:
303 failed_measurements.append({
304 'websdr_id': websdr_id,
305 'error': result,
306 'status': 'FAILED',
307 })
308 logger.error(
309 "Failed to save WebSDR %d measurement: %s",
310 websdr_id,
311 result
312 )
314 # Update progress
315 progress = ((idx + 1) / len(measurements)) * 100
316 self.update_state(
317 state='PROGRESS',
318 meta={
319 'current': idx + 1,
320 'total': len(measurements),
321 'successful': len(stored_measurements),
322 'failed': len(failed_measurements),
323 'status': f'Storing {idx + 1}/{len(measurements)} measurements to MinIO...',
324 'progress': progress
325 }
326 )
328 except Exception as e:
329 error_msg = f"Exception storing WebSDR measurement: {str(e)}"
330 logger.exception(error_msg)
331 failed_measurements.append({
332 'websdr_id': measurement.get('websdr_id'),
333 'error': error_msg,
334 'status': 'FAILED',
335 })
337 result = {
338 'status': 'SUCCESS' if not failed_measurements else 'PARTIAL_FAILURE',
339 'message': f'Stored {len(stored_measurements)} measurements',
340 'measurements_count': len(measurements),
341 'successful': len(stored_measurements),
342 'failed': len(failed_measurements),
343 'stored_measurements': stored_measurements,
344 'failed_measurements': failed_measurements,
345 }
347 logger.info(
348 "MinIO storage complete: %d successful, %d failed",
349 len(stored_measurements),
350 len(failed_measurements)
351 )
353 return result
355 except Exception as e:
356 logger.exception("MinIO storage task failed: %s", str(e))
357 return {
358 'status': 'FAILED',
359 'error': str(e),
360 'measurements_count': len(measurements),
361 'successful': 0,
362 }
365@shared_task(bind=True)
366def save_measurements_to_timescaledb(
367 self,
368 task_id: str,
369 measurements: List[Dict],
370 s3_paths: Optional[Dict[int, str]] = None,
371):
372 """
373 Save measurement metadata to TimescaleDB.
375 This task:
376 1. Initializes database manager
377 2. Bulk inserts measurements into measurements hypertable
378 3. Handles partial failures gracefully
379 4. Returns detailed result with counts
381 Args:
382 task_id: Parent task ID (session ID)
383 measurements: List of measurement dictionaries
384 s3_paths: Optional dict mapping websdr_id to S3 paths
386 Returns:
387 Dict with insertion results
389 Raises:
390 Will retry on database errors (max 3 times)
391 """
392 try:
393 logger.info(
394 "Saving %d measurements to TimescaleDB for task %s",
395 len(measurements),
396 task_id
397 )
399 # Initialize database manager
400 from ..storage.db_manager import get_db_manager
401 db_manager = get_db_manager()
403 # Verify database connection
404 if not db_manager.check_connection():
405 logger.error("Failed to connect to TimescaleDB")
406 # Retry task - will trigger autoretry
407 raise Exception("Database connection failed")
409 # Update progress: starting
410 self.update_state(
411 state='PROGRESS',
412 meta={
413 'current': 0,
414 'total': len(measurements),
415 'status': f'Inserting {len(measurements)} measurements into TimescaleDB...',
416 'progress': 0
417 }
418 )
420 # Bulk insert measurements
421 successful, failed = db_manager.insert_measurements_bulk(
422 task_id=task_id,
423 measurements_list=measurements,
424 s3_paths=s3_paths
425 )
427 # Update final progress
428 progress = 100 if successful > 0 else 0
429 self.update_state(
430 state='PROGRESS',
431 meta={
432 'current': successful,
433 'total': len(measurements),
434 'successful': successful,
435 'failed': failed,
436 'status': f'Completed: {successful} inserted, {failed} failed',
437 'progress': progress
438 }
439 )
441 result = {
442 'status': 'SUCCESS' if failed == 0 else 'PARTIAL_FAILURE',
443 'message': f'Inserted {successful}/{len(measurements)} measurements',
444 'measurements_count': len(measurements),
445 'successful': successful,
446 'failed': failed,
447 'task_id': task_id,
448 }
450 logger.info(
451 "TimescaleDB storage complete: %d successful, %d failed",
452 successful,
453 failed
454 )
456 return result
458 except Exception as e:
459 logger.exception("TimescaleDB storage task failed: %s", str(e))
460 # Task will automatically retry due to base=AcquisitionTask
461 raise
464@shared_task(bind=True)
465def health_check_websdrs(self):
466 """
467 Health check all configured WebSDRs.
469 Returns:
470 Dict mapping WebSDR ID to health status
471 """
472 import asyncio
473 from ..routers.acquisition import get_websdrs_config
475 # Load WebSDR configs from the configuration
476 websdrs_config_list = get_websdrs_config()
477 websdrs = [WebSDRConfig(**cfg) for cfg in websdrs_config_list]
479 if not websdrs:
480 logger.warning("No WebSDRs configured for health check")
481 return {}
483 async def check():
484 async with WebSDRFetcher(websdrs=websdrs) as fetcher:
485 return await fetcher.health_check()
487 try:
488 loop = asyncio.get_event_loop()
489 except RuntimeError:
490 loop = asyncio.new_event_loop()
491 asyncio.set_event_loop(loop)
493 health_status = loop.run_until_complete(check())
494 logger.info("WebSDR health check: %s", health_status)
496 return health_status