Coverage for services/api-gateway/src/main.py: 38%

262 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-25 16:18 +0000

1from datetime import datetime, timedelta 

2from fastapi import FastAPI, Request, HTTPException, Depends, WebSocket, WebSocketDisconnect 

3from fastapi.middleware.cors import CORSMiddleware 

4from fastapi.responses import JSONResponse 

5import httpx 

6import logging 

7import os 

8import sys 

9import asyncio 

10 

11# Add parent directory to path for auth module 

12sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..', 'common')) 

13 

14from .config import settings 

15from .models.health import HealthResponse 

16from .websocket_manager import manager as ws_manager, heartbeat_task 

17 

18# Import Keycloak authentication 

19try: 

20 from auth.keycloak_auth import get_current_user, require_admin, require_operator 

21 from auth.models import User 

22 AUTH_ENABLED = True 

23 logger = logging.getLogger(__name__) 

24 logger.info("✅ Keycloak authentication enabled") 

25except ImportError as e: 

26 logger = logging.getLogger(__name__) 

27 logger.warning(f"⚠️ Authentication disabled - could not import auth module: {e}") 

28 AUTH_ENABLED = False 

29 

30 # Define dummy user for when auth is disabled 

31 class User: 

32 def __init__(self): 

33 self.id = "anonymous" 

34 self.username = "anonymous" 

35 self.email = "anonymous@heimdall.local" 

36 self.roles = [] 

37 self.is_admin = False 

38 self.is_operator = False 

39 self.is_viewer = False 

40 

41 # Create a dummy dependency that never fails 

42 async def get_current_user(): 

43 """Dummy auth - returns anonymous user when auth is disabled.""" 

44 return User() 

45 

46 def require_admin(): 

47 async def _require_admin(user: User = Depends(get_current_user)): 

48 return user 

49 return _require_admin 

50 

51 def require_operator(): 

52 async def _require_operator(user: User = Depends(get_current_user)): 

53 return user 

54 return _require_operator 

55 

56SERVICE_NAME = "api-gateway" 

57SERVICE_VERSION = "0.1.0" 

58SERVICE_PORT = 8000 

59 

60# Backend service URLs - from settings 

61RF_ACQUISITION_URL = settings.rf_acquisition_url 

62INFERENCE_URL = settings.inference_url 

63TRAINING_URL = settings.training_url 

64DATA_INGESTION_URL = settings.data_ingestion_url 

65 

66app = FastAPI(title=f"Heimdall SDR - {SERVICE_NAME}", version=SERVICE_VERSION) 

67 

68app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) 

69 

70 

71async def proxy_request(request: Request, target_url: str): 

72 """ 

73 Proxy HTTP request to backend service. 

74  

75 Args: 

76 request: Incoming FastAPI request 

77 target_url: Target backend service URL 

78  

79 Returns: 

80 JSONResponse with backend response 

81 """ 

82 # Build full target URL 

83 path = request.url.path 

84 query = str(request.url.query) 

85 full_url = f"{target_url}{path}" 

86 if query: 

87 full_url = f"{full_url}?{query}" 

88 

89 logger.info(f"🔄 Proxy: {request.method} {full_url}") 

90 

91 # Get request body if present 

92 body = None 

93 if request.method in ["POST", "PUT", "PATCH"]: 

94 body = await request.body() 

95 

96 # Forward headers (excluding host) 

97 headers = dict(request.headers) 

98 headers.pop('host', None) 

99 

100 async with httpx.AsyncClient(timeout=30.0) as client: 

101 try: 

102 response = await client.request( 

103 method=request.method, 

104 url=full_url, 

105 headers=headers, 

106 content=body, 

107 ) 

108 

109 logger.info(f"✅ Proxy response: {response.status_code} from {full_url}") 

110 

111 # Parse response content safely 

112 try: 

113 content = response.json() if response.text else {} 

114 except Exception as json_err: 

115 logger.warning(f"⚠️ Response is not JSON: {str(json_err)}") 

116 content = {"raw": response.text[:500]} 

117 

118 return JSONResponse( 

119 content=content, 

120 status_code=response.status_code, 

121 headers=dict(response.headers) 

122 ) 

123 except httpx.TimeoutException: 

124 logger.error(f"❌ Timeout proxying request to {full_url}") 

125 raise HTTPException(status_code=504, detail="Backend service timeout") 

126 except httpx.ConnectError as e: 

127 logger.error(f"❌ Connection error proxying request to {full_url}: {str(e)}") 

128 raise HTTPException(status_code=503, detail="Backend service unavailable") 

129 except Exception as e: 

130 logger.exception(f"❌ Error proxying request to {full_url}: {str(e)}") 

131 raise HTTPException(status_code=500, detail=f"Proxy error: {str(e)}") 

132 

133 

134# ============================================================================= 

135# PUBLIC HEALTH ENDPOINTS - No authentication required 

136# ============================================================================= 

137 

138@app.get("/health") 

139async def health_check(): 

140 """Health check for API Gateway.""" 

141 return HealthResponse( 

142 status="healthy", 

143 service=SERVICE_NAME, 

144 version=SERVICE_VERSION, 

145 timestamp=datetime.utcnow() 

146 ) 

147 

148 

149@app.get("/ready") 

150async def readiness_check(): 

151 """Readiness check for API Gateway.""" 

152 return {"ready": True} 

153 

154 

155# These must be defined BEFORE the catch-all {path:path} routes to take precedence 

156@app.get("/api/v1/acquisition/health") 

157async def acquisition_health_public(request: Request): 

158 """Public health check endpoint for RF Acquisition service.""" 

159 logger.debug(f"📡 Public health check: /api/v1/acquisition/health") 

160 return await proxy_request(request, RF_ACQUISITION_URL) 

161 

162 

163@app.get("/api/v1/acquisition/websdrs") 

164async def acquisition_websdrs_public(request: Request): 

165 """Public endpoint to get WebSDR configuration.""" 

166 logger.debug(f"📡 Public WebSDRs endpoint: /api/v1/acquisition/websdrs") 

167 return await proxy_request(request, RF_ACQUISITION_URL) 

168 

169 

170@app.get("/api/v1/inference/health") 

171async def inference_health_public(request: Request): 

172 """Public health check endpoint for Inference service.""" 

173 logger.debug(f"🧠 Public health check: /api/v1/inference/health") 

174 return await proxy_request(request, INFERENCE_URL) 

175 

176 

177@app.get("/api/v1/api-gateway/health") 

178async def api_gateway_health_public(): 

179 """Health check for API Gateway service.""" 

180 logger.debug(f"🌐 API Gateway health check: /api/v1/api-gateway/health") 

181 return HealthResponse( 

182 status="healthy", 

183 service=SERVICE_NAME, 

184 version=SERVICE_VERSION, 

185 timestamp=datetime.utcnow() 

186 ) 

187 

188 

189@app.get("/api/v1/rf-acquisition/health") 

190async def rf_acquisition_health_public(): 

191 """Health check endpoint for RF Acquisition service - maps to backend /health.""" 

192 logger.debug(f"📡 RF Acquisition health check: /api/v1/rf-acquisition/health") 

193 async with httpx.AsyncClient(timeout=10.0) as client: 

194 try: 

195 response = await client.get(f"{RF_ACQUISITION_URL}/health") 

196 if response.status_code == 200: 

197 return response.json() 

198 else: 

199 return HealthResponse(status="unhealthy", service="rf-acquisition", version="unknown", timestamp=datetime.utcnow()) 

200 except Exception as e: 

201 logger.warning(f"⚠️ Could not reach rf-acquisition health: {str(e)}") 

202 return HealthResponse(status="unhealthy", service="rf-acquisition", version="unknown", timestamp=datetime.utcnow()) 

203 

204 

205@app.get("/api/v1/inference/health") 

206async def inference_health_check(): 

207 """Health check endpoint for Inference service - maps to backend /health.""" 

208 logger.debug(f"🧠 Inference health check: /api/v1/inference/health") 

209 async with httpx.AsyncClient(timeout=10.0) as client: 

210 try: 

211 response = await client.get(f"{INFERENCE_URL}/health") 

212 if response.status_code == 200: 

213 return response.json() 

214 else: 

215 return HealthResponse(status="unhealthy", service="inference", version="unknown", timestamp=datetime.utcnow()) 

216 except Exception as e: 

217 logger.warning(f"⚠️ Could not reach inference health: {str(e)}") 

218 return HealthResponse(status="unhealthy", service="inference", version="unknown", timestamp=datetime.utcnow()) 

219 

220 

221# OLD ENDPOINTS - Kept for backward compatibility but use /api/v1/{service}/health instead 

222@app.get("/api/v1/acquisition/health") 

223async def acquisition_health_deprecated(request: Request): 

224 """DEPRECATED: Use /api/v1/rf-acquisition/health instead.""" 

225 logger.debug(f"📡 Deprecated: /api/v1/acquisition/health → /api/v1/rf-acquisition/health") 

226 async with httpx.AsyncClient(timeout=10.0) as client: 

227 try: 

228 response = await client.get(f"{RF_ACQUISITION_URL}/health") 

229 if response.status_code == 200: 

230 return response.json() 

231 else: 

232 return HealthResponse(status="unhealthy", service="rf-acquisition", version="unknown", timestamp=datetime.utcnow()) 

233 except Exception as e: 

234 raise HTTPException(status_code=503, detail="rf-acquisition unavailable") 

235 

236 

237# ============================================================================= 

238# ============================================================================= 

239# PROTECTED ACQUISITION ENDPOINTS - Requires auth (falls back to anonymous if disabled) 

240# ============================================================================= 

241 

242@app.api_route("/api/v1/acquisition/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"]) 

243async def proxy_to_rf_acquisition( 

244 request: Request, 

245 path: str, 

246 user: User = Depends(get_current_user) 

247): 

248 """Proxy requests to RF Acquisition service (requires authentication).""" 

249 if AUTH_ENABLED and not user.is_operator: 

250 raise HTTPException(status_code=403, detail="Operator access required") 

251 logger.debug(f"📡 Acquisition route matched: path={path} (user={user.username})") 

252 return await proxy_request(request, RF_ACQUISITION_URL) 

253 

254 

255# ============================================================================= 

256# PROTECTED INFERENCE ENDPOINTS - Requires auth (falls back to anonymous if disabled) 

257# ============================================================================= 

258 

259@app.api_route("/api/v1/inference/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"]) 

260async def proxy_to_inference( 

261 request: Request, 

262 path: str, 

263 user: User = Depends(get_current_user) 

264): 

265 """Proxy requests to Inference service (requires authentication).""" 

266 if AUTH_ENABLED and not user.is_viewer: 

267 raise HTTPException(status_code=403, detail="Viewer access required") 

268 logger.debug(f"🧠 Inference route matched: path={path} (user={user.username})") 

269 return await proxy_request(request, INFERENCE_URL) 

270 

271 

272# ============================================================================= 

273# PROTECTED TRAINING ENDPOINTS 

274# ============================================================================= 

275 

276# ============================================================================= 

277# PROTECTED TRAINING ENDPOINTS - Requires auth (falls back to anonymous if disabled) 

278# ============================================================================= 

279 

280@app.api_route("/api/v1/training/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"]) 

281async def proxy_to_training( 

282 request: Request, 

283 path: str, 

284 user: User = Depends(get_current_user) 

285): 

286 """Proxy requests to Training service (requires authentication).""" 

287 if AUTH_ENABLED and not user.is_operator: 

288 raise HTTPException(status_code=403, detail="Operator access required") 

289 logger.debug(f"📚 Training route matched: path={path} (user={user.username})") 

290 return await proxy_request(request, TRAINING_URL) 

291 

292 

293# ============================================================================= 

294# PROTECTED DATA INGESTION ENDPOINTS 

295# ============================================================================= 

296 

297# ============================================================================= 

298# PROTECTED DATA INGESTION ENDPOINTS - Requires auth (falls back to anonymous if disabled) 

299# ============================================================================= 

300 

301@app.api_route("/api/v1/sessions/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"]) 

302async def proxy_to_data_ingestion( 

303 request: Request, 

304 path: str, 

305 user: User = Depends(get_current_user) 

306): 

307 """Proxy requests to Data Ingestion service (requires authentication).""" 

308 if AUTH_ENABLED and not user.is_operator: 

309 raise HTTPException(status_code=403, detail="Operator access required") 

310 logger.debug(f"💾 Data Ingestion route matched: path={path} (user={user.username})") 

311 return await proxy_request(request, DATA_INGESTION_URL) 

312 

313 

314# ============================================================================= 

315# PUBLIC ANALYTICS ENDPOINTS 

316# ============================================================================= 

317 

318@app.api_route("/api/v1/analytics/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"]) 

319async def proxy_to_inference_analytics(request: Request, path: str): 

320 """Proxy analytics requests to Inference service (public access for demo).""" 

321 logger.debug(f"📊 Analytics route matched: path={path}") 

322 return await proxy_request(request, INFERENCE_URL) 

323 

324 

325# ============================================================================= 

326# ROOT & ROOT ENDPOINTS 

327# ============================================================================= 

328 

329@app.get("/") 

330async def root(): 

331 """Root endpoint - returns service info.""" 

332 return { 

333 "service": SERVICE_NAME, 

334 "status": "running", 

335 "timestamp": datetime.utcnow().isoformat(), 

336 "auth_enabled": AUTH_ENABLED 

337 } 

338 

339 

340# ============================================================================= 

341# WEBSOCKET ENDPOINT - Real-time Updates 

342# ============================================================================= 

343 

344@app.websocket("/ws/updates") 

345async def websocket_endpoint(websocket: WebSocket): 

346 """ 

347 WebSocket endpoint for real-time dashboard updates. 

348  

349 Events broadcasted: 

350 - services:health - Service health status updates 

351 - websdrs:status - WebSDR receiver status changes 

352 - signals:detected - New signal detections 

353 - localizations:updated - New localization points 

354 """ 

355 await ws_manager.connect(websocket) 

356 

357 # Start heartbeat task 

358 heartbeat = asyncio.create_task(heartbeat_task(websocket)) 

359 

360 try: 

361 while True: 

362 # Receive messages from client (e.g., ping, subscribe events) 

363 data = await websocket.receive_json() 

364 

365 event = data.get("event") 

366 

367 # Handle ping/pong 

368 if event == "ping": 

369 await websocket.send_json({ 

370 "event": "pong", 

371 "data": {}, 

372 "timestamp": datetime.utcnow().isoformat() 

373 }) 

374 

375 # Handle subscription requests (future enhancement) 

376 elif event == "subscribe": 

377 event_name = data.get("data", {}).get("event_name") 

378 if event_name: 

379 ws_manager.subscribe(websocket, event_name) 

380 

381 elif event == "unsubscribe": 

382 event_name = data.get("data", {}).get("event_name") 

383 if event_name: 

384 ws_manager.unsubscribe(websocket, event_name) 

385 

386 except WebSocketDisconnect: 

387 logger.info("WebSocket client disconnected normally") 

388 except Exception as e: 

389 logger.error(f"WebSocket error: {e}") 

390 finally: 

391 heartbeat.cancel() 

392 ws_manager.disconnect(websocket) 

393 

394 

395# ============================================================================= 

396# AUTHENTICATION ENDPOINTS 

397# ============================================================================= 

398 

399@app.post("/api/v1/auth/login") 

400async def login_proxy(request: Request): 

401 """ 

402 Proxy OAuth2 token request to Keycloak. 

403  

404 This endpoint proxies login requests to Keycloak, allowing the frontend 

405 to request tokens through the API Gateway (which has CORS enabled). 

406  

407 Accepts both: 

408 - application/x-www-form-urlencoded (from OAuth2 clients) 

409 - application/json (for convenience) 

410 """ 

411 try: 

412 # Parse body based on content type 

413 content_type = request.headers.get("content-type", "").lower() 

414 

415 if "application/json" in content_type: 

416 # Parse JSON body 

417 body = await request.json() 

418 email = body.get("email") or body.get("username") 

419 password = body.get("password") 

420 logger.debug(f"📋 Parsed JSON body: email={email}") 

421 else: 

422 # Parse form-urlencoded body (OAuth2 standard) 

423 form_data = await request.form() 

424 email = form_data.get("username") or form_data.get("email") 

425 password = form_data.get("password") 

426 logger.debug(f"📋 Parsed form-urlencoded body: email={email}") 

427 

428 # Validate required fields 

429 if not email or not password: 

430 return JSONResponse( 

431 status_code=400, 

432 content={"error": "Missing email/username or password"} 

433 ) 

434 

435 # Keycloak token endpoint 

436 keycloak_url = os.getenv("KEYCLOAK_URL", "http://keycloak:8080") 

437 keycloak_realm = os.getenv("KEYCLOAK_REALM", "heimdall") 

438 client_id = os.getenv("VITE_KEYCLOAK_CLIENT_ID", "heimdall-frontend") 

439 token_endpoint = f"{keycloak_url}/realms/{keycloak_realm}/protocol/openid-connect/token" 

440 

441 logger.info(f"🔐 Proxying login to: {token_endpoint}") 

442 

443 # Build form data for Keycloak (it expects form-urlencoded, not JSON) 

444 form_data = { 

445 "client_id": client_id, 

446 "username": email, 

447 "password": password, 

448 "grant_type": "password" 

449 } 

450 

451 # Forward to Keycloak 

452 async with httpx.AsyncClient() as client: 

453 response = await client.post( 

454 token_endpoint, 

455 data=form_data, # Use data= for form-urlencoded 

456 headers={"Content-Type": "application/x-www-form-urlencoded"} 

457 ) 

458 

459 logger.info(f"🔐 Keycloak response: {response.status_code}") 

460 

461 # Return Keycloak response with CORS headers (added by middleware) 

462 if response.status_code == 200: 

463 return JSONResponse( 

464 status_code=response.status_code, 

465 content=response.json() 

466 ) 

467 else: 

468 error_content = response.json() if response.text else {} 

469 logger.warning(f"⚠️ Keycloak error: {error_content}") 

470 return JSONResponse( 

471 status_code=response.status_code, 

472 content=error_content or {"error": "Authentication failed"} 

473 ) 

474 except Exception as e: 

475 logger.error(f"❌ Login proxy error: {str(e)}", exc_info=True) 

476 return JSONResponse( 

477 status_code=500, 

478 content={"error": "Internal server error"} 

479 ) 

480 

481 

482@app.get("/api/v1/auth/check") 

483async def auth_check(user: User = Depends(get_current_user)): 

484 """Check authentication status and return user info.""" 

485 return { 

486 "authenticated": AUTH_ENABLED, 

487 "auth_enabled": AUTH_ENABLED, 

488 "user": { 

489 "id": user.id, 

490 "username": user.username, 

491 "email": user.email, 

492 "roles": user.roles, 

493 "is_admin": user.is_admin, 

494 "is_operator": user.is_operator, 

495 "is_viewer": user.is_viewer, 

496 } if AUTH_ENABLED else None 

497 } 

498 

499 

500# ============================================================================= 

501# USER PROFILE & PREFERENCES ENDPOINTS (Keycloak-based) 

502# ============================================================================= 

503 

504if AUTH_ENABLED: 

505 @app.get("/api/v1/auth/me") 

506 async def get_current_user_info(user: User = Depends(get_current_user)): 

507 """Get current authenticated user information from Keycloak token.""" 

508 return { 

509 "id": user.id, 

510 "username": user.username, 

511 "email": user.email, 

512 "roles": user.roles, 

513 "is_admin": user.is_admin, 

514 "is_operator": user.is_operator, 

515 "is_viewer": user.is_viewer, 

516 } 

517 

518 @app.get("/api/v1/profile") 

519 async def get_user_profile(user: User = Depends(get_current_user)): 

520 """Get user profile from Keycloak.""" 

521 # TODO: Extend with additional profile data from database if needed 

522 return { 

523 "id": user.id, 

524 "username": user.username, 

525 "email": user.email, 

526 "roles": user.roles, 

527 "first_name": user.username.split("@")[0] if user.username else "", 

528 "last_name": "", 

529 "created_at": datetime.utcnow().isoformat(), 

530 "last_login": datetime.utcnow().isoformat(), 

531 } 

532 

533 

534# ============================================================================= 

535# SYSTEM STATUS & METRICS ENDPOINTS 

536# ============================================================================= 

537 

538@app.get("/api/v1/config") 

539async def get_config(): 

540 """Get application configuration.""" 

541 return { 

542 "websdrs": 7, 

543 "supported_bands": ["2m", "70cm"], 

544 "max_duration_seconds": 300, 

545 "min_frequency_mhz": 144.0, 

546 "max_frequency_mhz": 146.0, 

547 "keycloak_realm": os.getenv("KEYCLOAK_REALM", "heimdall"), 

548 "keycloak_url": os.getenv("KEYCLOAK_URL", "http://keycloak:8080"), 

549 } 

550 

551 

552@app.get("/api/v1/stats") 

553async def get_dashboard_stats(): 

554 """Get dashboard statistics from database.""" 

555 # TODO: Implement real stats aggregation from database 

556 return { 

557 "total_sessions": 0, 

558 "active_sessions": 0, 

559 "completed_predictions": 0, 

560 "average_accuracy_m": 0.0, 

561 "websdrs_online": 7, 

562 "uptime_percentage": 100.0, 

563 } 

564 

565 

566@app.get("/api/v1/system/status") 

567async def get_system_status(): 

568 """Aggregate health status from all services.""" 

569 services_health = [] 

570 service_urls = { 

571 "api-gateway": settings.api_gateway_url, 

572 "rf-acquisition": RF_ACQUISITION_URL, 

573 "data-ingestion-web": DATA_INGESTION_URL, 

574 "inference": INFERENCE_URL, 

575 "training": TRAINING_URL, 

576 } 

577 

578 async with httpx.AsyncClient(timeout=5.0) as client: 

579 for service_name, url in service_urls.items(): 

580 try: 

581 response = await client.get(f"{url}/health") 

582 status = "healthy" if response.status_code == 200 else "unhealthy" 

583 except Exception: 

584 status = "unreachable" 

585 

586 services_health.append({ 

587 "name": service_name, 

588 "status": status, 

589 "url": url, 

590 }) 

591 

592 overall_healthy = all(s["status"] == "healthy" for s in services_health) 

593 

594 result = { 

595 "overall_status": "healthy" if overall_healthy else "degraded", 

596 "services": services_health, 

597 "timestamp": datetime.utcnow().isoformat(), 

598 } 

599 

600 # Broadcast to WebSocket clients 

601 services_health_dict = {s["name"]: {"status": s["status"]} for s in services_health} 

602 asyncio.create_task(ws_manager.broadcast("services:health", services_health_dict)) 

603 

604 return result 

605 

606 

607if __name__ == "__main__": 

608 import uvicorn 

609 uvicorn.run(app, host="0.0.0.0", port=SERVICE_PORT)