Coverage for services/data-ingestion-web/src/tasks.py: 0%
32 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
1"""Celery tasks for RF acquisition orchestration"""
2import os
3import json
4from datetime import datetime
5from celery import Celery
6from kombu import Exchange, Queue
8# Celery configuration
9redis_password = os.getenv("REDIS_PASSWORD", "changeme")
10CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "amqp://guest:guest@rabbitmq:5672//")
11CELERY_RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", f"redis://:{redis_password}@redis:6379/1")
13celery_app = Celery(
14 "data_ingestion",
15 broker=CELERY_BROKER_URL,
16 backend=CELERY_RESULT_BACKEND,
17)
19# Configure task routing
20celery_app.conf.task_routes = {
21 "services.data_ingestion_web.tasks.trigger_acquisition": {
22 "queue": "acquisition.websdr-fetch",
23 "routing_key": "acquisition.websdr-fetch",
24 }
25}
27# Configure queues
28celery_app.conf.queues = (
29 Queue(
30 "acquisition.websdr-fetch",
31 Exchange("acquisition", type="direct"),
32 routing_key="acquisition.websdr-fetch",
33 ),
34)
36# General configuration
37celery_app.conf.update(
38 task_serializer="json",
39 accept_content=["json"],
40 result_serializer="json",
41 timezone="UTC",
42 enable_utc=True,
43 task_track_started=True,
44 task_time_limit=600, # 10 minutes
45 task_soft_time_limit=300, # 5 minutes soft timeout
46)
49@celery_app.task(bind=True, name="data_ingestion.trigger_acquisition")
50def trigger_acquisition(
51 self,
52 session_id: int,
53 frequency_mhz: float,
54 duration_seconds: int,
55) -> dict:
56 """
57 Trigger RF acquisition from WebSDR receivers.
58 This task calls the rf-acquisition service API.
59 """
60 import requests
61 import logging
63 logger = logging.getLogger(__name__)
65 try:
66 # Call RF acquisition API
67 # The rf-acquisition service is available at http://rf-acquisition:8001 in docker-compose
68 rf_api_url = os.getenv("RF_ACQUISITION_API_URL", "http://rf-acquisition:8001")
70 logger.info(f"Triggering RF acquisition for session {session_id} at {frequency_mhz} MHz")
72 response = requests.post(
73 f"{rf_api_url}/api/acquire",
74 json={
75 "frequency_mhz": frequency_mhz,
76 "duration_seconds": duration_seconds,
77 },
78 timeout=duration_seconds + 60, # Give extra time for timeout
79 )
80 response.raise_for_status()
82 acquisition_data = response.json()
84 return {
85 "status": "completed",
86 "session_id": session_id,
87 "task_id": self.request.id,
88 "result": acquisition_data,
89 }
91 except requests.exceptions.RequestException as e:
92 error_msg = f"RF acquisition failed: {str(e)}"
93 logger.error(error_msg)
94 return {
95 "status": "failed",
96 "session_id": session_id,
97 "task_id": self.request.id,
98 "error": error_msg,
99 }
100 except Exception as e:
101 error_msg = f"Unexpected error: {str(e)}"
102 logger.error(error_msg)
103 return {
104 "status": "failed",
105 "session_id": session_id,
106 "task_id": self.request.id,
107 "error": error_msg,
108 }