Coverage for services/data-ingestion-web/src/routers/sessions.py: 25%
110 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
2"""
3Recording sessions API endpoints
4"""
5from datetime import datetime
6from typing import List, Optional
7from uuid import UUID
8from fastapi import APIRouter, HTTPException, Query
9import asyncpg
10import logging
12from ..models.session import (
13 RecordingSession,
14 RecordingSessionCreate,
15 RecordingSessionWithDetails,
16 SessionListResponse,
17 SessionAnalytics,
18 KnownSource,
19 KnownSourceCreate,
20)
21from ..db import get_pool
23logger = logging.getLogger(__name__)
25router = APIRouter(prefix="/api/v1/sessions", tags=["sessions"])
28@router.get("", response_model=SessionListResponse)
29async def list_sessions(
30 page: int = Query(1, ge=1, description="Page number"),
31 per_page: int = Query(20, ge=1, le=100, description="Items per page"),
32 status: Optional[str] = Query(None, description="Filter by status"),
33 approval_status: Optional[str] = Query(None, description="Filter by approval status"),
34):
35 """List all recording sessions with pagination"""
36 pool = await get_pool()
38 offset = (page - 1) * per_page
40 # Build query with filters
41 where_clauses = []
42 if status:
43 where_clauses.append(f"rs.status = '{status}'")
44 if approval_status:
45 where_clauses.append(f"rs.approval_status = '{approval_status}'")
47 where_sql = " AND " + " AND ".join(where_clauses) if where_clauses else ""
49 query = f"""
50 SELECT
51 rs.id,
52 rs.known_source_id,
53 rs.session_name,
54 rs.session_start,
55 rs.session_end,
56 rs.duration_seconds,
57 rs.celery_task_id,
58 rs.status,
59 rs.approval_status,
60 rs.notes,
61 rs.created_at,
62 rs.updated_at,
63 ks.name as source_name,
64 ks.frequency_hz as source_frequency,
65 ks.latitude as source_latitude,
66 ks.longitude as source_longitude,
67 COUNT(m.id) as measurements_count
68 FROM heimdall.recording_sessions rs
69 JOIN heimdall.known_sources ks ON rs.known_source_id = ks.id
70 LEFT JOIN heimdall.measurements m ON m.created_at >= rs.session_start
71 AND (rs.session_end IS NULL OR m.created_at <= rs.session_end)
72 {where_sql}
73 GROUP BY rs.id, ks.name, ks.frequency_hz, ks.latitude, ks.longitude
74 ORDER BY rs.session_start DESC
75 LIMIT $1 OFFSET $2
76 """
78 count_query = f"""
79 SELECT COUNT(*)
80 FROM heimdall.recording_sessions rs
81 WHERE 1=1 {where_sql}
82 """
84 async with pool.acquire() as conn:
85 # Get sessions
86 rows = await conn.fetch(query, per_page, offset)
88 # Get total count
89 total = await conn.fetchval(count_query)
91 sessions = [
92 RecordingSessionWithDetails(
93 id=row["id"],
94 known_source_id=row["known_source_id"],
95 session_name=row["session_name"],
96 session_start=row["session_start"],
97 session_end=row["session_end"],
98 duration_seconds=row["duration_seconds"],
99 celery_task_id=row["celery_task_id"],
100 status=row["status"],
101 approval_status=row["approval_status"],
102 notes=row["notes"],
103 created_at=row["created_at"],
104 updated_at=row["updated_at"],
105 source_name=row["source_name"],
106 source_frequency=row["source_frequency"],
107 source_latitude=row["source_latitude"],
108 source_longitude=row["source_longitude"],
109 measurements_count=row["measurements_count"],
110 )
111 for row in rows
112 ]
114 return SessionListResponse(
115 sessions=sessions,
116 total=total,
117 page=page,
118 per_page=per_page,
119 )
122@router.get("/analytics", response_model=SessionAnalytics)
123async def get_session_analytics():
124 """Get analytics for all sessions"""
125 pool = await get_pool()
127 query = """
128 SELECT
129 COUNT(*) as total_sessions,
130 COUNT(*) FILTER (WHERE status = 'completed') as completed_sessions,
131 COUNT(*) FILTER (WHERE status = 'failed') as failed_sessions,
132 COUNT(*) FILTER (WHERE status = 'pending') as pending_sessions,
133 AVG(duration_seconds) as average_duration_seconds,
134 SUM((SELECT COUNT(*) FROM heimdall.measurements m
135 WHERE m.created_at >= rs.session_start
136 AND (rs.session_end IS NULL OR m.created_at <= rs.session_end))) as total_measurements
137 FROM heimdall.recording_sessions rs
138 """
140 async with pool.acquire() as conn:
141 row = await conn.fetchrow(query)
143 total = row["total_sessions"] or 0
144 completed = row["completed_sessions"] or 0
145 failed = row["failed_sessions"] or 0
146 pending = row["pending_sessions"] or 0
148 success_rate = (completed / total * 100) if total > 0 else 0.0
150 # Calculate average accuracy from inference results if available
151 accuracy_query = """
152 SELECT AVG(uncertainty_meters) as avg_accuracy
153 FROM heimdall.recording_sessions rs
154 WHERE rs.status = 'completed'
155 """
157 accuracy_row = await conn.fetchrow(accuracy_query)
158 average_accuracy = accuracy_row["avg_accuracy"] if accuracy_row else None
160 return SessionAnalytics(
161 total_sessions=total,
162 completed_sessions=completed,
163 failed_sessions=failed,
164 pending_sessions=pending,
165 success_rate=success_rate,
166 total_measurements=row["total_measurements"] or 0,
167 average_duration_seconds=row["average_duration_seconds"],
168 average_accuracy_meters=average_accuracy,
169 )
172@router.get("/{session_id}", response_model=RecordingSessionWithDetails)
173async def get_session(session_id: UUID):
174 """Get a specific recording session by ID"""
175 pool = await get_pool()
177 query = """
178 SELECT
179 rs.id,
180 rs.known_source_id,
181 rs.session_name,
182 rs.session_start,
183 rs.session_end,
184 rs.duration_seconds,
185 rs.celery_task_id,
186 rs.status,
187 rs.approval_status,
188 rs.notes,
189 rs.created_at,
190 rs.updated_at,
191 ks.name as source_name,
192 ks.frequency_hz as source_frequency,
193 ks.latitude as source_latitude,
194 ks.longitude as source_longitude,
195 COUNT(m.id) as measurements_count
196 FROM heimdall.recording_sessions rs
197 JOIN heimdall.known_sources ks ON rs.known_source_id = ks.id
198 LEFT JOIN heimdall.measurements m ON m.created_at >= rs.session_start
199 AND (rs.session_end IS NULL OR m.created_at <= rs.session_end)
200 WHERE rs.id = $1
201 GROUP BY rs.id, ks.name, ks.frequency_hz, ks.latitude, ks.longitude
202 """
204 async with pool.acquire() as conn:
205 row = await conn.fetchrow(query, session_id)
207 if not row:
208 raise HTTPException(status_code=404, detail="Session not found")
210 return RecordingSessionWithDetails(
211 id=row["id"],
212 known_source_id=row["known_source_id"],
213 session_name=row["session_name"],
214 session_start=row["session_start"],
215 session_end=row["session_end"],
216 duration_seconds=row["duration_seconds"],
217 celery_task_id=row["celery_task_id"],
218 status=row["status"],
219 approval_status=row["approval_status"],
220 notes=row["notes"],
221 created_at=row["created_at"],
222 updated_at=row["updated_at"],
223 source_name=row["source_name"],
224 source_frequency=row["source_frequency"],
225 source_latitude=row["source_latitude"],
226 source_longitude=row["source_longitude"],
227 measurements_count=row["measurements_count"],
228 )
231@router.post("", response_model=RecordingSession, status_code=201)
232async def create_session(session: RecordingSessionCreate):
233 """Create a new recording session"""
234 pool = await get_pool()
236 # Verify known source exists
237 async with pool.acquire() as conn:
238 source_exists = await conn.fetchval(
239 "SELECT EXISTS(SELECT 1 FROM heimdall.known_sources WHERE id = $1)",
240 session.known_source_id
241 )
243 if not source_exists:
244 raise HTTPException(status_code=404, detail="Known source not found")
246 # Insert new session
247 query = """
248 INSERT INTO heimdall.recording_sessions
249 (known_source_id, session_name, session_start, status, approval_status, notes)
250 VALUES ($1, $2, $3, 'pending', 'pending', $4)
251 RETURNING
252 id, known_source_id, session_name, session_start, session_end,
253 duration_seconds, celery_task_id, status, approval_status,
254 notes, created_at, updated_at
255 """
257 row = await conn.fetchrow(
258 query,
259 session.known_source_id,
260 session.session_name,
261 datetime.utcnow(),
262 session.notes,
263 )
265 return RecordingSession(**dict(row))
268@router.patch("/{session_id}/status")
269async def update_session_status(
270 session_id: UUID,
271 status: str,
272 celery_task_id: Optional[str] = None,
273):
274 """Update session status"""
275 pool = await get_pool()
277 # Validate status
278 valid_statuses = ["pending", "in_progress", "completed", "failed"]
279 if status not in valid_statuses:
280 raise HTTPException(
281 status_code=400,
282 detail=f"Invalid status. Must be one of: {', '.join(valid_statuses)}"
283 )
285 async with pool.acquire() as conn:
286 # Update session
287 query = """
288 UPDATE heimdall.recording_sessions
289 SET status = $1,
290 celery_task_id = COALESCE($2, celery_task_id),
291 session_end = CASE WHEN $1 IN ('completed', 'failed') THEN NOW() ELSE session_end END,
292 duration_seconds = CASE WHEN $1 IN ('completed', 'failed')
293 THEN EXTRACT(EPOCH FROM (NOW() - session_start))
294 ELSE duration_seconds END,
295 updated_at = NOW()
296 WHERE id = $3
297 RETURNING
298 id, known_source_id, session_name, session_start, session_end,
299 duration_seconds, celery_task_id, status, approval_status,
300 notes, created_at, updated_at
301 """
303 row = await conn.fetchrow(query, status, celery_task_id, session_id)
305 if not row:
306 raise HTTPException(status_code=404, detail="Session not found")
308 return RecordingSession(**dict(row))
311@router.patch("/{session_id}/approval")
312async def update_session_approval(session_id: UUID, approval_status: str):
313 """Update session approval status"""
314 pool = await get_pool()
316 # Validate approval status
317 valid_statuses = ["pending", "approved", "rejected"]
318 if approval_status not in valid_statuses:
319 raise HTTPException(
320 status_code=400,
321 detail=f"Invalid approval status. Must be one of: {', '.join(valid_statuses)}"
322 )
324 async with pool.acquire() as conn:
325 query = """
326 UPDATE heimdall.recording_sessions
327 SET approval_status = $1, updated_at = NOW()
328 WHERE id = $2
329 RETURNING
330 id, known_source_id, session_name, session_start, session_end,
331 duration_seconds, celery_task_id, status, approval_status,
332 notes, created_at, updated_at
333 """
335 row = await conn.fetchrow(query, approval_status, session_id)
337 if not row:
338 raise HTTPException(status_code=404, detail="Session not found")
340 return RecordingSession(**dict(row))
343@router.delete("/{session_id}", status_code=204)
344async def delete_session(session_id: UUID):
345 """Delete a recording session"""
346 pool = await get_pool()
348 async with pool.acquire() as conn:
349 result = await conn.execute(
350 "DELETE FROM heimdall.recording_sessions WHERE id = $1",
351 session_id
352 )
354 if result == "DELETE 0":
355 raise HTTPException(status_code=404, detail="Session not found")
357 return None
360@router.get("/known-sources", response_model=List[KnownSource])
361async def list_known_sources():
362 """List all known RF sources"""
363 pool = await get_pool()
365 query = """
366 SELECT id, name, description, frequency_hz, latitude, longitude,
367 power_dbm, source_type, is_validated, created_at, updated_at
368 FROM heimdall.known_sources
369 ORDER BY name
370 """
372 async with pool.acquire() as conn:
373 rows = await conn.fetch(query)
374 return [KnownSource(**dict(row)) for row in rows]
377@router.post("/known-sources", response_model=KnownSource, status_code=201)
378async def create_known_source(source: KnownSourceCreate):
379 """Create a new known RF source"""
380 pool = await get_pool()
382 query = """
383 INSERT INTO heimdall.known_sources
384 (name, description, frequency_hz, latitude, longitude, power_dbm, source_type, is_validated)
385 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
386 RETURNING id, name, description, frequency_hz, latitude, longitude,
387 power_dbm, source_type, is_validated, created_at, updated_at
388 """
390 async with pool.acquire() as conn:
391 try:
392 row = await conn.fetchrow(
393 query,
394 source.name,
395 source.description,
396 source.frequency_hz,
397 source.latitude,
398 source.longitude,
399 source.power_dbm,
400 source.source_type,
401 source.is_validated,
402 )
404 return KnownSource(**dict(row))
405 except asyncpg.UniqueViolationError:
406 raise HTTPException(
407 status_code=400,
408 detail="A known source with this name already exists"
409 )