Coverage for services/rf-acquisition/src/tasks/acquire_iq.py: 19%

145 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-25 16:18 +0000

1"""Celery tasks for IQ data acquisition.""" 

2 

3import logging 

4import json 

5from datetime import datetime 

6from typing import Dict, List, Optional 

7from celery import shared_task, Task 

8from celery.utils.log import get_task_logger 

9import numpy as np 

10 

11from ..models.websdrs import WebSDRConfig, MeasurementRecord, AcquisitionRequest 

12from ..fetchers.websdr_fetcher import WebSDRFetcher 

13from ..processors.iq_processor import IQProcessor 

14from ..storage.minio_client import MinIOClient 

15from ..config import settings 

16 

17logger = get_task_logger(__name__) 

18 

19 

20class AcquisitionTask(Task): 

21 """Base task for acquisitions.""" 

22 

23 autoretry_for = (Exception,) 

24 retry_kwargs = {'max_retries': 3} 

25 retry_backoff = True 

26 retry_backoff_max = 600 

27 retry_jitter = True 

28 

29 

30@shared_task(bind=True, base=AcquisitionTask) 

31def acquire_iq( 

32 self, 

33 frequency_mhz: float, 

34 duration_seconds: float, 

35 start_time_iso: str, 

36 websdrs_config_list: List[Dict], 

37 sample_rate_khz: float = 12.5, 

38): 

39 """ 

40 Acquire IQ data from multiple WebSDR receivers simultaneously. 

41  

42 This task: 

43 1. Fetches IQ from 7 WebSDR simultaneously 

44 2. Processes signal metrics 

45 3. Saves to MinIO (via separate storage task) 

46 4. Writes metadata to TimescaleDB (via separate DB task) 

47  

48 Args: 

49 frequency_mhz: Target frequency in MHz 

50 duration_seconds: Acquisition duration in seconds 

51 start_time_iso: Start time in ISO format 

52 websdrs_config_list: List of WebSDR config dicts 

53 sample_rate_khz: Sample rate in kHz 

54  

55 Returns: 

56 Dict with acquisition results 

57 """ 

58 try: 

59 self.update_state( 

60 state='PROGRESS', 

61 meta={ 

62 'current': 0, 

63 'total': len(websdrs_config_list), 

64 'status': 'Initializing fetcher...' 

65 } 

66 ) 

67 

68 # Parse WebSDR configs 

69 websdrs = [ 

70 WebSDRConfig(**cfg) for cfg in websdrs_config_list 

71 ] 

72 

73 logger.info( 

74 "Starting acquisition task from %d WebSDRs at %.2f MHz for %.1f seconds", 

75 len(websdrs), 

76 frequency_mhz, 

77 duration_seconds 

78 ) 

79 

80 # Fetch IQ data 

81 measurements = [] 

82 errors = [] 

83 

84 import asyncio 

85 

86 async def fetch_and_process(): 

87 async with WebSDRFetcher( 

88 websdrs=websdrs, 

89 timeout=30, 

90 retry_count=3, 

91 concurrent_limit=7 

92 ) as fetcher: 

93 # Update state: fetching 

94 self.update_state( 

95 state='PROGRESS', 

96 meta={ 

97 'current': 0, 

98 'total': len(websdrs), 

99 'status': f'Fetching IQ from {len(websdrs)} WebSDRs...' 

100 } 

101 ) 

102 

103 # Fetch all simultaneously 

104 iq_data_dict = await fetcher.fetch_iq_simultaneous( 

105 frequency_mhz=frequency_mhz, 

106 duration_seconds=duration_seconds, 

107 sample_rate_khz=sample_rate_khz 

108 ) 

109 

110 # Process results 

111 successful = 0 

112 for idx, (websdr_id, (iq_data, error)) in enumerate(iq_data_dict.items()): 

113 if error: 

114 errors.append(f"WebSDR {websdr_id}: {error}") 

115 logger.warning("WebSDR %d acquisition failed: %s", websdr_id, error) 

116 else: 

117 try: 

118 # Compute metrics 

119 metrics = IQProcessor.compute_metrics( 

120 iq_data=iq_data, 

121 sample_rate_hz=int(sample_rate_khz * 1e3), 

122 target_frequency_hz=int(frequency_mhz * 1e6), 

123 noise_bandwidth_hz=10000 

124 ) 

125 

126 # Create measurement record 

127 measurement = { 

128 'websdr_id': websdr_id, 

129 'frequency_mhz': frequency_mhz, 

130 'sample_rate_khz': sample_rate_khz, 

131 'samples_count': len(iq_data), 

132 'timestamp_utc': datetime.utcnow().isoformat(), 

133 'metrics': metrics.dict(), 

134 'iq_data_path': f's3://heimdall-raw-iq/sessions/{self.request.id}/websdr_{websdr_id}.npy', 

135 'iq_data': iq_data.tolist(), # For now, store in memory 

136 } 

137 measurements.append(measurement) 

138 successful += 1 

139 

140 logger.info( 

141 "Processed WebSDR %d - SNR: %.2f dB, Offset: %.2f Hz", 

142 websdr_id, 

143 metrics.snr_db, 

144 metrics.frequency_offset_hz 

145 ) 

146 except Exception as e: 

147 error_msg = f"Processing error for WebSDR {websdr_id}: {str(e)}" 

148 errors.append(error_msg) 

149 logger.exception(error_msg) 

150 

151 # Update progress 

152 progress = ((idx + 1) / len(websdrs)) * 100 

153 self.update_state( 

154 state='PROGRESS', 

155 meta={ 

156 'current': idx + 1, 

157 'total': len(websdrs), 

158 'successful': successful, 

159 'status': f'Processing {idx + 1}/{len(websdrs)} measurements...', 

160 'progress': progress 

161 } 

162 ) 

163 

164 return measurements, errors 

165 

166 # Run async function 

167 try: 

168 loop = asyncio.get_event_loop() 

169 except RuntimeError: 

170 loop = asyncio.new_event_loop() 

171 asyncio.set_event_loop(loop) 

172 

173 measurements, errors = loop.run_until_complete(fetch_and_process()) 

174 

175 result = { 

176 'task_id': self.request.id, 

177 'status': 'SUCCESS' if measurements else 'PARTIAL_FAILURE', 

178 'measurements': measurements, 

179 'measurements_count': len(measurements), 

180 'errors': errors, 

181 'start_time': start_time_iso, 

182 'end_time': datetime.utcnow().isoformat(), 

183 'frequency_mhz': frequency_mhz, 

184 'duration_seconds': duration_seconds, 

185 } 

186 

187 logger.info( 

188 "Acquisition complete: %d successful, %d errors", 

189 len(measurements), 

190 len(errors) 

191 ) 

192 

193 return result 

194 

195 except Exception as e: 

196 logger.exception("Acquisition task failed: %s", str(e)) 

197 raise 

198 

199 

200@shared_task(bind=True) 

201def save_measurements_to_minio( 

202 self, 

203 task_id: str, 

204 measurements: List[Dict], 

205): 

206 """ 

207 Save IQ data from measurements to MinIO. 

208  

209 This task: 

210 1. Initializes MinIO client 

211 2. Stores each measurement's IQ data as .npy file 

212 3. Stores associated metadata as JSON 

213 4. Returns S3 paths for all stored files 

214  

215 Args: 

216 task_id: Parent task ID (session ID) 

217 measurements: List of measurement records with 'iq_data' and 'websdr_id' 

218  

219 Returns: 

220 Dict with storage results 

221 """ 

222 try: 

223 logger.info("Saving %d measurements to MinIO...", len(measurements)) 

224 

225 # Initialize MinIO client 

226 minio_client = MinIOClient( 

227 endpoint_url=settings.minio_url, 

228 access_key=settings.minio_access_key, 

229 secret_key=settings.minio_secret_key, 

230 bucket_name=settings.minio_bucket_raw_iq, 

231 ) 

232 

233 # Ensure bucket exists 

234 if not minio_client.ensure_bucket_exists(): 

235 logger.error("Failed to access MinIO bucket") 

236 return { 

237 'status': 'FAILED', 

238 'message': 'Failed to access MinIO bucket', 

239 'measurements_count': 0, 

240 'successful': 0, 

241 } 

242 

243 stored_measurements = [] 

244 failed_measurements = [] 

245 

246 for idx, measurement in enumerate(measurements): 

247 try: 

248 websdr_id = measurement.get('websdr_id') 

249 iq_data = measurement.get('iq_data') 

250 

251 # Validate required fields 

252 if websdr_id is None: 

253 logger.error("Missing websdr_id in measurement at index %d", idx) 

254 failed_measurements.append({ 

255 'websdr_id': None, 

256 'error': 'Missing websdr_id', 

257 'status': 'FAILED', 

258 }) 

259 continue 

260 

261 if iq_data is None: 

262 logger.error("Missing iq_data for WebSDR %d", websdr_id) 

263 failed_measurements.append({ 

264 'websdr_id': websdr_id, 

265 'error': 'Missing iq_data', 

266 'status': 'FAILED', 

267 }) 

268 continue 

269 

270 # Convert IQ data list to numpy array if needed 

271 if isinstance(iq_data, list): 

272 iq_array = np.array(iq_data, dtype=np.complex64) 

273 else: 

274 iq_array = np.asarray(iq_data, dtype=np.complex64) 

275 

276 # Prepare metadata (exclude iq_data to avoid duplication) 

277 metadata = { 

278 k: v for k, v in measurement.items() 

279 if k not in ['iq_data', 'iq_data_path'] 

280 } 

281 

282 # Upload to MinIO 

283 success, result = minio_client.upload_iq_data( 

284 iq_data=iq_array, 

285 task_id=task_id, 

286 websdr_id=int(websdr_id), 

287 metadata=metadata, 

288 ) 

289 

290 if success: 

291 stored_measurements.append({ 

292 'websdr_id': websdr_id, 

293 's3_path': result, 

294 'samples_count': len(iq_array), 

295 'status': 'SUCCESS', 

296 }) 

297 logger.info( 

298 "Saved WebSDR %d measurement to %s", 

299 websdr_id, 

300 result 

301 ) 

302 else: 

303 failed_measurements.append({ 

304 'websdr_id': websdr_id, 

305 'error': result, 

306 'status': 'FAILED', 

307 }) 

308 logger.error( 

309 "Failed to save WebSDR %d measurement: %s", 

310 websdr_id, 

311 result 

312 ) 

313 

314 # Update progress 

315 progress = ((idx + 1) / len(measurements)) * 100 

316 self.update_state( 

317 state='PROGRESS', 

318 meta={ 

319 'current': idx + 1, 

320 'total': len(measurements), 

321 'successful': len(stored_measurements), 

322 'failed': len(failed_measurements), 

323 'status': f'Storing {idx + 1}/{len(measurements)} measurements to MinIO...', 

324 'progress': progress 

325 } 

326 ) 

327 

328 except Exception as e: 

329 error_msg = f"Exception storing WebSDR measurement: {str(e)}" 

330 logger.exception(error_msg) 

331 failed_measurements.append({ 

332 'websdr_id': measurement.get('websdr_id'), 

333 'error': error_msg, 

334 'status': 'FAILED', 

335 }) 

336 

337 result = { 

338 'status': 'SUCCESS' if not failed_measurements else 'PARTIAL_FAILURE', 

339 'message': f'Stored {len(stored_measurements)} measurements', 

340 'measurements_count': len(measurements), 

341 'successful': len(stored_measurements), 

342 'failed': len(failed_measurements), 

343 'stored_measurements': stored_measurements, 

344 'failed_measurements': failed_measurements, 

345 } 

346 

347 logger.info( 

348 "MinIO storage complete: %d successful, %d failed", 

349 len(stored_measurements), 

350 len(failed_measurements) 

351 ) 

352 

353 return result 

354 

355 except Exception as e: 

356 logger.exception("MinIO storage task failed: %s", str(e)) 

357 return { 

358 'status': 'FAILED', 

359 'error': str(e), 

360 'measurements_count': len(measurements), 

361 'successful': 0, 

362 } 

363 

364 

365@shared_task(bind=True) 

366def save_measurements_to_timescaledb( 

367 self, 

368 task_id: str, 

369 measurements: List[Dict], 

370 s3_paths: Optional[Dict[int, str]] = None, 

371): 

372 """ 

373 Save measurement metadata to TimescaleDB. 

374  

375 This task: 

376 1. Initializes database manager 

377 2. Bulk inserts measurements into measurements hypertable 

378 3. Handles partial failures gracefully 

379 4. Returns detailed result with counts 

380  

381 Args: 

382 task_id: Parent task ID (session ID) 

383 measurements: List of measurement dictionaries 

384 s3_paths: Optional dict mapping websdr_id to S3 paths 

385  

386 Returns: 

387 Dict with insertion results 

388  

389 Raises: 

390 Will retry on database errors (max 3 times) 

391 """ 

392 try: 

393 logger.info( 

394 "Saving %d measurements to TimescaleDB for task %s", 

395 len(measurements), 

396 task_id 

397 ) 

398 

399 # Initialize database manager 

400 from ..storage.db_manager import get_db_manager 

401 db_manager = get_db_manager() 

402 

403 # Verify database connection 

404 if not db_manager.check_connection(): 

405 logger.error("Failed to connect to TimescaleDB") 

406 # Retry task - will trigger autoretry 

407 raise Exception("Database connection failed") 

408 

409 # Update progress: starting 

410 self.update_state( 

411 state='PROGRESS', 

412 meta={ 

413 'current': 0, 

414 'total': len(measurements), 

415 'status': f'Inserting {len(measurements)} measurements into TimescaleDB...', 

416 'progress': 0 

417 } 

418 ) 

419 

420 # Bulk insert measurements 

421 successful, failed = db_manager.insert_measurements_bulk( 

422 task_id=task_id, 

423 measurements_list=measurements, 

424 s3_paths=s3_paths 

425 ) 

426 

427 # Update final progress 

428 progress = 100 if successful > 0 else 0 

429 self.update_state( 

430 state='PROGRESS', 

431 meta={ 

432 'current': successful, 

433 'total': len(measurements), 

434 'successful': successful, 

435 'failed': failed, 

436 'status': f'Completed: {successful} inserted, {failed} failed', 

437 'progress': progress 

438 } 

439 ) 

440 

441 result = { 

442 'status': 'SUCCESS' if failed == 0 else 'PARTIAL_FAILURE', 

443 'message': f'Inserted {successful}/{len(measurements)} measurements', 

444 'measurements_count': len(measurements), 

445 'successful': successful, 

446 'failed': failed, 

447 'task_id': task_id, 

448 } 

449 

450 logger.info( 

451 "TimescaleDB storage complete: %d successful, %d failed", 

452 successful, 

453 failed 

454 ) 

455 

456 return result 

457 

458 except Exception as e: 

459 logger.exception("TimescaleDB storage task failed: %s", str(e)) 

460 # Task will automatically retry due to base=AcquisitionTask 

461 raise 

462 

463 

464@shared_task(bind=True) 

465def health_check_websdrs(self): 

466 """ 

467 Health check all configured WebSDRs. 

468  

469 Returns: 

470 Dict mapping WebSDR ID to health status 

471 """ 

472 import asyncio 

473 from ..routers.acquisition import get_websdrs_config 

474 

475 # Load WebSDR configs from the configuration 

476 websdrs_config_list = get_websdrs_config() 

477 websdrs = [WebSDRConfig(**cfg) for cfg in websdrs_config_list] 

478 

479 if not websdrs: 

480 logger.warning("No WebSDRs configured for health check") 

481 return {} 

482 

483 async def check(): 

484 async with WebSDRFetcher(websdrs=websdrs) as fetcher: 

485 return await fetcher.health_check() 

486 

487 try: 

488 loop = asyncio.get_event_loop() 

489 except RuntimeError: 

490 loop = asyncio.new_event_loop() 

491 asyncio.set_event_loop(loop) 

492 

493 health_status = loop.run_until_complete(check()) 

494 logger.info("WebSDR health check: %s", health_status) 

495 

496 return health_status