Coverage for services/rf-acquisition/src/main.py: 76%

42 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-25 16:18 +0000

1import logging 

2from datetime import datetime 

3from fastapi import FastAPI 

4from fastapi.middleware.cors import CORSMiddleware 

5from fastapi.responses import JSONResponse 

6from celery import Celery 

7 

8from .config import settings 

9from .models.health import HealthResponse 

10from .routers.acquisition import router as acquisition_router 

11 

12# Configure logging 

13logging.basicConfig( 

14 level=logging.INFO, 

15 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' 

16) 

17logger = logging.getLogger(__name__) 

18 

19SERVICE_NAME = "rf-acquisition" 

20SERVICE_VERSION = "0.1.0" 

21SERVICE_PORT = 8001 

22 

23# Initialize FastAPI app 

24app = FastAPI( 

25 title=f"Heimdall SDR - {SERVICE_NAME}", 

26 version=SERVICE_VERSION, 

27 description="RF data acquisition service for Heimdall SDR platform" 

28) 

29 

30app.add_middleware( 

31 CORSMiddleware, 

32 allow_origins=["*"], 

33 allow_credentials=True, 

34 allow_methods=["*"], 

35 allow_headers=["*"] 

36) 

37 

38# Initialize Celery 

39celery_app = Celery( 

40 SERVICE_NAME, 

41 broker=settings.celery_broker_url, 

42 backend=settings.celery_result_backend_url 

43) 

44 

45celery_app.conf.update( 

46 task_serializer='json', 

47 accept_content=['json'], 

48 result_serializer='json', 

49 timezone='UTC', 

50 enable_utc=True, 

51 task_track_started=True, 

52 task_time_limit=30 * 60, # 30 minutes 

53 task_soft_time_limit=25 * 60, # 25 minutes 

54) 

55 

56# Configure Celery Beat schedule for monitoring 

57celery_app.conf.beat_schedule = { 

58 'monitor-websdrs-uptime': { 

59 'task': 'monitor_websdrs_uptime', 

60 'schedule': 60.0, # Every 60 seconds 

61 }, 

62} 

63 

64# Include routers 

65app.include_router(acquisition_router) 

66 

67 

68@app.get("/") 

69async def root(): 

70 """Root endpoint.""" 

71 return { 

72 "service": SERVICE_NAME, 

73 "status": "running", 

74 "timestamp": datetime.utcnow().isoformat() 

75 } 

76 

77 

78@app.get("/health") 

79async def health_check(): 

80 """Health check endpoint.""" 

81 return HealthResponse( 

82 status="healthy", 

83 service=SERVICE_NAME, 

84 version=SERVICE_VERSION, 

85 timestamp=datetime.utcnow() 

86 ) 

87 

88 

89@app.get("/ready") 

90async def readiness_check(): 

91 """Readiness check endpoint.""" 

92 try: 

93 # Check Celery connectivity only if required 

94 if settings.celery_check_required: 

95 inspect = celery_app.control.inspect() 

96 if inspect is None or inspect.active() is None: 

97 return JSONResponse( 

98 status_code=503, 

99 content={"ready": False, "reason": "Celery broker not responding"} 

100 ) 

101 

102 return JSONResponse( 

103 status_code=200, 

104 content={"ready": True, "service": SERVICE_NAME} 

105 ) 

106 except Exception as e: 

107 logger.warning("Readiness check warning: %s", str(e)) 

108 # In development mode, still return ready=True even if Celery check fails 

109 if settings.environment == "development": 

110 return JSONResponse( 

111 status_code=200, 

112 content={"ready": True, "service": SERVICE_NAME, "warning": str(e)} 

113 ) 

114 return JSONResponse( 

115 status_code=503, 

116 content={"ready": False, "reason": str(e)} 

117 ) 

118 

119 

120if __name__ == "__main__": 

121 import uvicorn 

122 uvicorn.run(app, host="0.0.0.0", port=SERVICE_PORT)