Coverage for services/data-ingestion-web/src/tasks.py: 0%

32 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-25 16:18 +0000

1"""Celery tasks for RF acquisition orchestration""" 

2import os 

3import json 

4from datetime import datetime 

5from celery import Celery 

6from kombu import Exchange, Queue 

7 

8# Celery configuration 

9redis_password = os.getenv("REDIS_PASSWORD", "changeme") 

10CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "amqp://guest:guest@rabbitmq:5672//") 

11CELERY_RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", f"redis://:{redis_password}@redis:6379/1") 

12 

13celery_app = Celery( 

14 "data_ingestion", 

15 broker=CELERY_BROKER_URL, 

16 backend=CELERY_RESULT_BACKEND, 

17) 

18 

19# Configure task routing 

20celery_app.conf.task_routes = { 

21 "services.data_ingestion_web.tasks.trigger_acquisition": { 

22 "queue": "acquisition.websdr-fetch", 

23 "routing_key": "acquisition.websdr-fetch", 

24 } 

25} 

26 

27# Configure queues 

28celery_app.conf.queues = ( 

29 Queue( 

30 "acquisition.websdr-fetch", 

31 Exchange("acquisition", type="direct"), 

32 routing_key="acquisition.websdr-fetch", 

33 ), 

34) 

35 

36# General configuration 

37celery_app.conf.update( 

38 task_serializer="json", 

39 accept_content=["json"], 

40 result_serializer="json", 

41 timezone="UTC", 

42 enable_utc=True, 

43 task_track_started=True, 

44 task_time_limit=600, # 10 minutes 

45 task_soft_time_limit=300, # 5 minutes soft timeout 

46) 

47 

48 

49@celery_app.task(bind=True, name="data_ingestion.trigger_acquisition") 

50def trigger_acquisition( 

51 self, 

52 session_id: int, 

53 frequency_mhz: float, 

54 duration_seconds: int, 

55) -> dict: 

56 """ 

57 Trigger RF acquisition from WebSDR receivers. 

58 This task calls the rf-acquisition service API. 

59 """ 

60 import requests 

61 import logging 

62 

63 logger = logging.getLogger(__name__) 

64 

65 try: 

66 # Call RF acquisition API 

67 # The rf-acquisition service is available at http://rf-acquisition:8001 in docker-compose 

68 rf_api_url = os.getenv("RF_ACQUISITION_API_URL", "http://rf-acquisition:8001") 

69 

70 logger.info(f"Triggering RF acquisition for session {session_id} at {frequency_mhz} MHz") 

71 

72 response = requests.post( 

73 f"{rf_api_url}/api/acquire", 

74 json={ 

75 "frequency_mhz": frequency_mhz, 

76 "duration_seconds": duration_seconds, 

77 }, 

78 timeout=duration_seconds + 60, # Give extra time for timeout 

79 ) 

80 response.raise_for_status() 

81 

82 acquisition_data = response.json() 

83 

84 return { 

85 "status": "completed", 

86 "session_id": session_id, 

87 "task_id": self.request.id, 

88 "result": acquisition_data, 

89 } 

90 

91 except requests.exceptions.RequestException as e: 

92 error_msg = f"RF acquisition failed: {str(e)}" 

93 logger.error(error_msg) 

94 return { 

95 "status": "failed", 

96 "session_id": session_id, 

97 "task_id": self.request.id, 

98 "error": error_msg, 

99 } 

100 except Exception as e: 

101 error_msg = f"Unexpected error: {str(e)}" 

102 logger.error(error_msg) 

103 return { 

104 "status": "failed", 

105 "session_id": session_id, 

106 "task_id": self.request.id, 

107 "error": error_msg, 

108 }