Coverage for services/api-gateway/src/main.py: 38%
262 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
1from datetime import datetime, timedelta
2from fastapi import FastAPI, Request, HTTPException, Depends, WebSocket, WebSocketDisconnect
3from fastapi.middleware.cors import CORSMiddleware
4from fastapi.responses import JSONResponse
5import httpx
6import logging
7import os
8import sys
9import asyncio
11# Add parent directory to path for auth module
12sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..', 'common'))
14from .config import settings
15from .models.health import HealthResponse
16from .websocket_manager import manager as ws_manager, heartbeat_task
18# Import Keycloak authentication
19try:
20 from auth.keycloak_auth import get_current_user, require_admin, require_operator
21 from auth.models import User
22 AUTH_ENABLED = True
23 logger = logging.getLogger(__name__)
24 logger.info("✅ Keycloak authentication enabled")
25except ImportError as e:
26 logger = logging.getLogger(__name__)
27 logger.warning(f"⚠️ Authentication disabled - could not import auth module: {e}")
28 AUTH_ENABLED = False
30 # Define dummy user for when auth is disabled
31 class User:
32 def __init__(self):
33 self.id = "anonymous"
34 self.username = "anonymous"
35 self.email = "anonymous@heimdall.local"
36 self.roles = []
37 self.is_admin = False
38 self.is_operator = False
39 self.is_viewer = False
41 # Create a dummy dependency that never fails
42 async def get_current_user():
43 """Dummy auth - returns anonymous user when auth is disabled."""
44 return User()
46 def require_admin():
47 async def _require_admin(user: User = Depends(get_current_user)):
48 return user
49 return _require_admin
51 def require_operator():
52 async def _require_operator(user: User = Depends(get_current_user)):
53 return user
54 return _require_operator
56SERVICE_NAME = "api-gateway"
57SERVICE_VERSION = "0.1.0"
58SERVICE_PORT = 8000
60# Backend service URLs - from settings
61RF_ACQUISITION_URL = settings.rf_acquisition_url
62INFERENCE_URL = settings.inference_url
63TRAINING_URL = settings.training_url
64DATA_INGESTION_URL = settings.data_ingestion_url
66app = FastAPI(title=f"Heimdall SDR - {SERVICE_NAME}", version=SERVICE_VERSION)
68app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
71async def proxy_request(request: Request, target_url: str):
72 """
73 Proxy HTTP request to backend service.
75 Args:
76 request: Incoming FastAPI request
77 target_url: Target backend service URL
79 Returns:
80 JSONResponse with backend response
81 """
82 # Build full target URL
83 path = request.url.path
84 query = str(request.url.query)
85 full_url = f"{target_url}{path}"
86 if query:
87 full_url = f"{full_url}?{query}"
89 logger.info(f"🔄 Proxy: {request.method} {full_url}")
91 # Get request body if present
92 body = None
93 if request.method in ["POST", "PUT", "PATCH"]:
94 body = await request.body()
96 # Forward headers (excluding host)
97 headers = dict(request.headers)
98 headers.pop('host', None)
100 async with httpx.AsyncClient(timeout=30.0) as client:
101 try:
102 response = await client.request(
103 method=request.method,
104 url=full_url,
105 headers=headers,
106 content=body,
107 )
109 logger.info(f"✅ Proxy response: {response.status_code} from {full_url}")
111 # Parse response content safely
112 try:
113 content = response.json() if response.text else {}
114 except Exception as json_err:
115 logger.warning(f"⚠️ Response is not JSON: {str(json_err)}")
116 content = {"raw": response.text[:500]}
118 return JSONResponse(
119 content=content,
120 status_code=response.status_code,
121 headers=dict(response.headers)
122 )
123 except httpx.TimeoutException:
124 logger.error(f"❌ Timeout proxying request to {full_url}")
125 raise HTTPException(status_code=504, detail="Backend service timeout")
126 except httpx.ConnectError as e:
127 logger.error(f"❌ Connection error proxying request to {full_url}: {str(e)}")
128 raise HTTPException(status_code=503, detail="Backend service unavailable")
129 except Exception as e:
130 logger.exception(f"❌ Error proxying request to {full_url}: {str(e)}")
131 raise HTTPException(status_code=500, detail=f"Proxy error: {str(e)}")
134# =============================================================================
135# PUBLIC HEALTH ENDPOINTS - No authentication required
136# =============================================================================
138@app.get("/health")
139async def health_check():
140 """Health check for API Gateway."""
141 return HealthResponse(
142 status="healthy",
143 service=SERVICE_NAME,
144 version=SERVICE_VERSION,
145 timestamp=datetime.utcnow()
146 )
149@app.get("/ready")
150async def readiness_check():
151 """Readiness check for API Gateway."""
152 return {"ready": True}
155# These must be defined BEFORE the catch-all {path:path} routes to take precedence
156@app.get("/api/v1/acquisition/health")
157async def acquisition_health_public(request: Request):
158 """Public health check endpoint for RF Acquisition service."""
159 logger.debug(f"📡 Public health check: /api/v1/acquisition/health")
160 return await proxy_request(request, RF_ACQUISITION_URL)
163@app.get("/api/v1/acquisition/websdrs")
164async def acquisition_websdrs_public(request: Request):
165 """Public endpoint to get WebSDR configuration."""
166 logger.debug(f"📡 Public WebSDRs endpoint: /api/v1/acquisition/websdrs")
167 return await proxy_request(request, RF_ACQUISITION_URL)
170@app.get("/api/v1/inference/health")
171async def inference_health_public(request: Request):
172 """Public health check endpoint for Inference service."""
173 logger.debug(f"🧠 Public health check: /api/v1/inference/health")
174 return await proxy_request(request, INFERENCE_URL)
177@app.get("/api/v1/api-gateway/health")
178async def api_gateway_health_public():
179 """Health check for API Gateway service."""
180 logger.debug(f"🌐 API Gateway health check: /api/v1/api-gateway/health")
181 return HealthResponse(
182 status="healthy",
183 service=SERVICE_NAME,
184 version=SERVICE_VERSION,
185 timestamp=datetime.utcnow()
186 )
189@app.get("/api/v1/rf-acquisition/health")
190async def rf_acquisition_health_public():
191 """Health check endpoint for RF Acquisition service - maps to backend /health."""
192 logger.debug(f"📡 RF Acquisition health check: /api/v1/rf-acquisition/health")
193 async with httpx.AsyncClient(timeout=10.0) as client:
194 try:
195 response = await client.get(f"{RF_ACQUISITION_URL}/health")
196 if response.status_code == 200:
197 return response.json()
198 else:
199 return HealthResponse(status="unhealthy", service="rf-acquisition", version="unknown", timestamp=datetime.utcnow())
200 except Exception as e:
201 logger.warning(f"⚠️ Could not reach rf-acquisition health: {str(e)}")
202 return HealthResponse(status="unhealthy", service="rf-acquisition", version="unknown", timestamp=datetime.utcnow())
205@app.get("/api/v1/inference/health")
206async def inference_health_check():
207 """Health check endpoint for Inference service - maps to backend /health."""
208 logger.debug(f"🧠 Inference health check: /api/v1/inference/health")
209 async with httpx.AsyncClient(timeout=10.0) as client:
210 try:
211 response = await client.get(f"{INFERENCE_URL}/health")
212 if response.status_code == 200:
213 return response.json()
214 else:
215 return HealthResponse(status="unhealthy", service="inference", version="unknown", timestamp=datetime.utcnow())
216 except Exception as e:
217 logger.warning(f"⚠️ Could not reach inference health: {str(e)}")
218 return HealthResponse(status="unhealthy", service="inference", version="unknown", timestamp=datetime.utcnow())
221# OLD ENDPOINTS - Kept for backward compatibility but use /api/v1/{service}/health instead
222@app.get("/api/v1/acquisition/health")
223async def acquisition_health_deprecated(request: Request):
224 """DEPRECATED: Use /api/v1/rf-acquisition/health instead."""
225 logger.debug(f"📡 Deprecated: /api/v1/acquisition/health → /api/v1/rf-acquisition/health")
226 async with httpx.AsyncClient(timeout=10.0) as client:
227 try:
228 response = await client.get(f"{RF_ACQUISITION_URL}/health")
229 if response.status_code == 200:
230 return response.json()
231 else:
232 return HealthResponse(status="unhealthy", service="rf-acquisition", version="unknown", timestamp=datetime.utcnow())
233 except Exception as e:
234 raise HTTPException(status_code=503, detail="rf-acquisition unavailable")
237# =============================================================================
238# =============================================================================
239# PROTECTED ACQUISITION ENDPOINTS - Requires auth (falls back to anonymous if disabled)
240# =============================================================================
242@app.api_route("/api/v1/acquisition/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
243async def proxy_to_rf_acquisition(
244 request: Request,
245 path: str,
246 user: User = Depends(get_current_user)
247):
248 """Proxy requests to RF Acquisition service (requires authentication)."""
249 if AUTH_ENABLED and not user.is_operator:
250 raise HTTPException(status_code=403, detail="Operator access required")
251 logger.debug(f"📡 Acquisition route matched: path={path} (user={user.username})")
252 return await proxy_request(request, RF_ACQUISITION_URL)
255# =============================================================================
256# PROTECTED INFERENCE ENDPOINTS - Requires auth (falls back to anonymous if disabled)
257# =============================================================================
259@app.api_route("/api/v1/inference/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
260async def proxy_to_inference(
261 request: Request,
262 path: str,
263 user: User = Depends(get_current_user)
264):
265 """Proxy requests to Inference service (requires authentication)."""
266 if AUTH_ENABLED and not user.is_viewer:
267 raise HTTPException(status_code=403, detail="Viewer access required")
268 logger.debug(f"🧠 Inference route matched: path={path} (user={user.username})")
269 return await proxy_request(request, INFERENCE_URL)
272# =============================================================================
273# PROTECTED TRAINING ENDPOINTS
274# =============================================================================
276# =============================================================================
277# PROTECTED TRAINING ENDPOINTS - Requires auth (falls back to anonymous if disabled)
278# =============================================================================
280@app.api_route("/api/v1/training/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
281async def proxy_to_training(
282 request: Request,
283 path: str,
284 user: User = Depends(get_current_user)
285):
286 """Proxy requests to Training service (requires authentication)."""
287 if AUTH_ENABLED and not user.is_operator:
288 raise HTTPException(status_code=403, detail="Operator access required")
289 logger.debug(f"📚 Training route matched: path={path} (user={user.username})")
290 return await proxy_request(request, TRAINING_URL)
293# =============================================================================
294# PROTECTED DATA INGESTION ENDPOINTS
295# =============================================================================
297# =============================================================================
298# PROTECTED DATA INGESTION ENDPOINTS - Requires auth (falls back to anonymous if disabled)
299# =============================================================================
301@app.api_route("/api/v1/sessions/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
302async def proxy_to_data_ingestion(
303 request: Request,
304 path: str,
305 user: User = Depends(get_current_user)
306):
307 """Proxy requests to Data Ingestion service (requires authentication)."""
308 if AUTH_ENABLED and not user.is_operator:
309 raise HTTPException(status_code=403, detail="Operator access required")
310 logger.debug(f"💾 Data Ingestion route matched: path={path} (user={user.username})")
311 return await proxy_request(request, DATA_INGESTION_URL)
314# =============================================================================
315# PUBLIC ANALYTICS ENDPOINTS
316# =============================================================================
318@app.api_route("/api/v1/analytics/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
319async def proxy_to_inference_analytics(request: Request, path: str):
320 """Proxy analytics requests to Inference service (public access for demo)."""
321 logger.debug(f"📊 Analytics route matched: path={path}")
322 return await proxy_request(request, INFERENCE_URL)
325# =============================================================================
326# ROOT & ROOT ENDPOINTS
327# =============================================================================
329@app.get("/")
330async def root():
331 """Root endpoint - returns service info."""
332 return {
333 "service": SERVICE_NAME,
334 "status": "running",
335 "timestamp": datetime.utcnow().isoformat(),
336 "auth_enabled": AUTH_ENABLED
337 }
340# =============================================================================
341# WEBSOCKET ENDPOINT - Real-time Updates
342# =============================================================================
344@app.websocket("/ws/updates")
345async def websocket_endpoint(websocket: WebSocket):
346 """
347 WebSocket endpoint for real-time dashboard updates.
349 Events broadcasted:
350 - services:health - Service health status updates
351 - websdrs:status - WebSDR receiver status changes
352 - signals:detected - New signal detections
353 - localizations:updated - New localization points
354 """
355 await ws_manager.connect(websocket)
357 # Start heartbeat task
358 heartbeat = asyncio.create_task(heartbeat_task(websocket))
360 try:
361 while True:
362 # Receive messages from client (e.g., ping, subscribe events)
363 data = await websocket.receive_json()
365 event = data.get("event")
367 # Handle ping/pong
368 if event == "ping":
369 await websocket.send_json({
370 "event": "pong",
371 "data": {},
372 "timestamp": datetime.utcnow().isoformat()
373 })
375 # Handle subscription requests (future enhancement)
376 elif event == "subscribe":
377 event_name = data.get("data", {}).get("event_name")
378 if event_name:
379 ws_manager.subscribe(websocket, event_name)
381 elif event == "unsubscribe":
382 event_name = data.get("data", {}).get("event_name")
383 if event_name:
384 ws_manager.unsubscribe(websocket, event_name)
386 except WebSocketDisconnect:
387 logger.info("WebSocket client disconnected normally")
388 except Exception as e:
389 logger.error(f"WebSocket error: {e}")
390 finally:
391 heartbeat.cancel()
392 ws_manager.disconnect(websocket)
395# =============================================================================
396# AUTHENTICATION ENDPOINTS
397# =============================================================================
399@app.post("/api/v1/auth/login")
400async def login_proxy(request: Request):
401 """
402 Proxy OAuth2 token request to Keycloak.
404 This endpoint proxies login requests to Keycloak, allowing the frontend
405 to request tokens through the API Gateway (which has CORS enabled).
407 Accepts both:
408 - application/x-www-form-urlencoded (from OAuth2 clients)
409 - application/json (for convenience)
410 """
411 try:
412 # Parse body based on content type
413 content_type = request.headers.get("content-type", "").lower()
415 if "application/json" in content_type:
416 # Parse JSON body
417 body = await request.json()
418 email = body.get("email") or body.get("username")
419 password = body.get("password")
420 logger.debug(f"📋 Parsed JSON body: email={email}")
421 else:
422 # Parse form-urlencoded body (OAuth2 standard)
423 form_data = await request.form()
424 email = form_data.get("username") or form_data.get("email")
425 password = form_data.get("password")
426 logger.debug(f"📋 Parsed form-urlencoded body: email={email}")
428 # Validate required fields
429 if not email or not password:
430 return JSONResponse(
431 status_code=400,
432 content={"error": "Missing email/username or password"}
433 )
435 # Keycloak token endpoint
436 keycloak_url = os.getenv("KEYCLOAK_URL", "http://keycloak:8080")
437 keycloak_realm = os.getenv("KEYCLOAK_REALM", "heimdall")
438 client_id = os.getenv("VITE_KEYCLOAK_CLIENT_ID", "heimdall-frontend")
439 token_endpoint = f"{keycloak_url}/realms/{keycloak_realm}/protocol/openid-connect/token"
441 logger.info(f"🔐 Proxying login to: {token_endpoint}")
443 # Build form data for Keycloak (it expects form-urlencoded, not JSON)
444 form_data = {
445 "client_id": client_id,
446 "username": email,
447 "password": password,
448 "grant_type": "password"
449 }
451 # Forward to Keycloak
452 async with httpx.AsyncClient() as client:
453 response = await client.post(
454 token_endpoint,
455 data=form_data, # Use data= for form-urlencoded
456 headers={"Content-Type": "application/x-www-form-urlencoded"}
457 )
459 logger.info(f"🔐 Keycloak response: {response.status_code}")
461 # Return Keycloak response with CORS headers (added by middleware)
462 if response.status_code == 200:
463 return JSONResponse(
464 status_code=response.status_code,
465 content=response.json()
466 )
467 else:
468 error_content = response.json() if response.text else {}
469 logger.warning(f"⚠️ Keycloak error: {error_content}")
470 return JSONResponse(
471 status_code=response.status_code,
472 content=error_content or {"error": "Authentication failed"}
473 )
474 except Exception as e:
475 logger.error(f"❌ Login proxy error: {str(e)}", exc_info=True)
476 return JSONResponse(
477 status_code=500,
478 content={"error": "Internal server error"}
479 )
482@app.get("/api/v1/auth/check")
483async def auth_check(user: User = Depends(get_current_user)):
484 """Check authentication status and return user info."""
485 return {
486 "authenticated": AUTH_ENABLED,
487 "auth_enabled": AUTH_ENABLED,
488 "user": {
489 "id": user.id,
490 "username": user.username,
491 "email": user.email,
492 "roles": user.roles,
493 "is_admin": user.is_admin,
494 "is_operator": user.is_operator,
495 "is_viewer": user.is_viewer,
496 } if AUTH_ENABLED else None
497 }
500# =============================================================================
501# USER PROFILE & PREFERENCES ENDPOINTS (Keycloak-based)
502# =============================================================================
504if AUTH_ENABLED:
505 @app.get("/api/v1/auth/me")
506 async def get_current_user_info(user: User = Depends(get_current_user)):
507 """Get current authenticated user information from Keycloak token."""
508 return {
509 "id": user.id,
510 "username": user.username,
511 "email": user.email,
512 "roles": user.roles,
513 "is_admin": user.is_admin,
514 "is_operator": user.is_operator,
515 "is_viewer": user.is_viewer,
516 }
518 @app.get("/api/v1/profile")
519 async def get_user_profile(user: User = Depends(get_current_user)):
520 """Get user profile from Keycloak."""
521 # TODO: Extend with additional profile data from database if needed
522 return {
523 "id": user.id,
524 "username": user.username,
525 "email": user.email,
526 "roles": user.roles,
527 "first_name": user.username.split("@")[0] if user.username else "",
528 "last_name": "",
529 "created_at": datetime.utcnow().isoformat(),
530 "last_login": datetime.utcnow().isoformat(),
531 }
534# =============================================================================
535# SYSTEM STATUS & METRICS ENDPOINTS
536# =============================================================================
538@app.get("/api/v1/config")
539async def get_config():
540 """Get application configuration."""
541 return {
542 "websdrs": 7,
543 "supported_bands": ["2m", "70cm"],
544 "max_duration_seconds": 300,
545 "min_frequency_mhz": 144.0,
546 "max_frequency_mhz": 146.0,
547 "keycloak_realm": os.getenv("KEYCLOAK_REALM", "heimdall"),
548 "keycloak_url": os.getenv("KEYCLOAK_URL", "http://keycloak:8080"),
549 }
552@app.get("/api/v1/stats")
553async def get_dashboard_stats():
554 """Get dashboard statistics from database."""
555 # TODO: Implement real stats aggregation from database
556 return {
557 "total_sessions": 0,
558 "active_sessions": 0,
559 "completed_predictions": 0,
560 "average_accuracy_m": 0.0,
561 "websdrs_online": 7,
562 "uptime_percentage": 100.0,
563 }
566@app.get("/api/v1/system/status")
567async def get_system_status():
568 """Aggregate health status from all services."""
569 services_health = []
570 service_urls = {
571 "api-gateway": settings.api_gateway_url,
572 "rf-acquisition": RF_ACQUISITION_URL,
573 "data-ingestion-web": DATA_INGESTION_URL,
574 "inference": INFERENCE_URL,
575 "training": TRAINING_URL,
576 }
578 async with httpx.AsyncClient(timeout=5.0) as client:
579 for service_name, url in service_urls.items():
580 try:
581 response = await client.get(f"{url}/health")
582 status = "healthy" if response.status_code == 200 else "unhealthy"
583 except Exception:
584 status = "unreachable"
586 services_health.append({
587 "name": service_name,
588 "status": status,
589 "url": url,
590 })
592 overall_healthy = all(s["status"] == "healthy" for s in services_health)
594 result = {
595 "overall_status": "healthy" if overall_healthy else "degraded",
596 "services": services_health,
597 "timestamp": datetime.utcnow().isoformat(),
598 }
600 # Broadcast to WebSocket clients
601 services_health_dict = {s["name"]: {"status": s["status"]} for s in services_health}
602 asyncio.create_task(ws_manager.broadcast("services:health", services_health_dict))
604 return result
607if __name__ == "__main__":
608 import uvicorn
609 uvicorn.run(app, host="0.0.0.0", port=SERVICE_PORT)