Coverage for services/data-ingestion-web/src/routers/sessions.py: 25%

110 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-25 16:18 +0000

1 

2""" 

3Recording sessions API endpoints 

4""" 

5from datetime import datetime 

6from typing import List, Optional 

7from uuid import UUID 

8from fastapi import APIRouter, HTTPException, Query 

9import asyncpg 

10import logging 

11 

12from ..models.session import ( 

13 RecordingSession, 

14 RecordingSessionCreate, 

15 RecordingSessionWithDetails, 

16 SessionListResponse, 

17 SessionAnalytics, 

18 KnownSource, 

19 KnownSourceCreate, 

20) 

21from ..db import get_pool 

22 

23logger = logging.getLogger(__name__) 

24 

25router = APIRouter(prefix="/api/v1/sessions", tags=["sessions"]) 

26 

27 

28@router.get("", response_model=SessionListResponse) 

29async def list_sessions( 

30 page: int = Query(1, ge=1, description="Page number"), 

31 per_page: int = Query(20, ge=1, le=100, description="Items per page"), 

32 status: Optional[str] = Query(None, description="Filter by status"), 

33 approval_status: Optional[str] = Query(None, description="Filter by approval status"), 

34): 

35 """List all recording sessions with pagination""" 

36 pool = await get_pool() 

37 

38 offset = (page - 1) * per_page 

39 

40 # Build query with filters 

41 where_clauses = [] 

42 if status: 

43 where_clauses.append(f"rs.status = '{status}'") 

44 if approval_status: 

45 where_clauses.append(f"rs.approval_status = '{approval_status}'") 

46 

47 where_sql = " AND " + " AND ".join(where_clauses) if where_clauses else "" 

48 

49 query = f""" 

50 SELECT  

51 rs.id, 

52 rs.known_source_id, 

53 rs.session_name, 

54 rs.session_start, 

55 rs.session_end, 

56 rs.duration_seconds, 

57 rs.celery_task_id, 

58 rs.status, 

59 rs.approval_status, 

60 rs.notes, 

61 rs.created_at, 

62 rs.updated_at, 

63 ks.name as source_name, 

64 ks.frequency_hz as source_frequency, 

65 ks.latitude as source_latitude, 

66 ks.longitude as source_longitude, 

67 COUNT(m.id) as measurements_count 

68 FROM heimdall.recording_sessions rs 

69 JOIN heimdall.known_sources ks ON rs.known_source_id = ks.id 

70 LEFT JOIN heimdall.measurements m ON m.created_at >= rs.session_start  

71 AND (rs.session_end IS NULL OR m.created_at <= rs.session_end) 

72 {where_sql} 

73 GROUP BY rs.id, ks.name, ks.frequency_hz, ks.latitude, ks.longitude 

74 ORDER BY rs.session_start DESC 

75 LIMIT $1 OFFSET $2 

76 """ 

77 

78 count_query = f""" 

79 SELECT COUNT(*)  

80 FROM heimdall.recording_sessions rs 

81 WHERE 1=1 {where_sql} 

82 """ 

83 

84 async with pool.acquire() as conn: 

85 # Get sessions 

86 rows = await conn.fetch(query, per_page, offset) 

87 

88 # Get total count 

89 total = await conn.fetchval(count_query) 

90 

91 sessions = [ 

92 RecordingSessionWithDetails( 

93 id=row["id"], 

94 known_source_id=row["known_source_id"], 

95 session_name=row["session_name"], 

96 session_start=row["session_start"], 

97 session_end=row["session_end"], 

98 duration_seconds=row["duration_seconds"], 

99 celery_task_id=row["celery_task_id"], 

100 status=row["status"], 

101 approval_status=row["approval_status"], 

102 notes=row["notes"], 

103 created_at=row["created_at"], 

104 updated_at=row["updated_at"], 

105 source_name=row["source_name"], 

106 source_frequency=row["source_frequency"], 

107 source_latitude=row["source_latitude"], 

108 source_longitude=row["source_longitude"], 

109 measurements_count=row["measurements_count"], 

110 ) 

111 for row in rows 

112 ] 

113 

114 return SessionListResponse( 

115 sessions=sessions, 

116 total=total, 

117 page=page, 

118 per_page=per_page, 

119 ) 

120 

121 

122@router.get("/analytics", response_model=SessionAnalytics) 

123async def get_session_analytics(): 

124 """Get analytics for all sessions""" 

125 pool = await get_pool() 

126 

127 query = """ 

128 SELECT  

129 COUNT(*) as total_sessions, 

130 COUNT(*) FILTER (WHERE status = 'completed') as completed_sessions, 

131 COUNT(*) FILTER (WHERE status = 'failed') as failed_sessions, 

132 COUNT(*) FILTER (WHERE status = 'pending') as pending_sessions, 

133 AVG(duration_seconds) as average_duration_seconds, 

134 SUM((SELECT COUNT(*) FROM heimdall.measurements m  

135 WHERE m.created_at >= rs.session_start  

136 AND (rs.session_end IS NULL OR m.created_at <= rs.session_end))) as total_measurements 

137 FROM heimdall.recording_sessions rs 

138 """ 

139 

140 async with pool.acquire() as conn: 

141 row = await conn.fetchrow(query) 

142 

143 total = row["total_sessions"] or 0 

144 completed = row["completed_sessions"] or 0 

145 failed = row["failed_sessions"] or 0 

146 pending = row["pending_sessions"] or 0 

147 

148 success_rate = (completed / total * 100) if total > 0 else 0.0 

149 

150 # Calculate average accuracy from inference results if available 

151 accuracy_query = """ 

152 SELECT AVG(uncertainty_meters) as avg_accuracy 

153 FROM heimdall.recording_sessions rs 

154 WHERE rs.status = 'completed' 

155 """ 

156 

157 accuracy_row = await conn.fetchrow(accuracy_query) 

158 average_accuracy = accuracy_row["avg_accuracy"] if accuracy_row else None 

159 

160 return SessionAnalytics( 

161 total_sessions=total, 

162 completed_sessions=completed, 

163 failed_sessions=failed, 

164 pending_sessions=pending, 

165 success_rate=success_rate, 

166 total_measurements=row["total_measurements"] or 0, 

167 average_duration_seconds=row["average_duration_seconds"], 

168 average_accuracy_meters=average_accuracy, 

169 ) 

170 

171 

172@router.get("/{session_id}", response_model=RecordingSessionWithDetails) 

173async def get_session(session_id: UUID): 

174 """Get a specific recording session by ID""" 

175 pool = await get_pool() 

176 

177 query = """ 

178 SELECT  

179 rs.id, 

180 rs.known_source_id, 

181 rs.session_name, 

182 rs.session_start, 

183 rs.session_end, 

184 rs.duration_seconds, 

185 rs.celery_task_id, 

186 rs.status, 

187 rs.approval_status, 

188 rs.notes, 

189 rs.created_at, 

190 rs.updated_at, 

191 ks.name as source_name, 

192 ks.frequency_hz as source_frequency, 

193 ks.latitude as source_latitude, 

194 ks.longitude as source_longitude, 

195 COUNT(m.id) as measurements_count 

196 FROM heimdall.recording_sessions rs 

197 JOIN heimdall.known_sources ks ON rs.known_source_id = ks.id 

198 LEFT JOIN heimdall.measurements m ON m.created_at >= rs.session_start  

199 AND (rs.session_end IS NULL OR m.created_at <= rs.session_end) 

200 WHERE rs.id = $1 

201 GROUP BY rs.id, ks.name, ks.frequency_hz, ks.latitude, ks.longitude 

202 """ 

203 

204 async with pool.acquire() as conn: 

205 row = await conn.fetchrow(query, session_id) 

206 

207 if not row: 

208 raise HTTPException(status_code=404, detail="Session not found") 

209 

210 return RecordingSessionWithDetails( 

211 id=row["id"], 

212 known_source_id=row["known_source_id"], 

213 session_name=row["session_name"], 

214 session_start=row["session_start"], 

215 session_end=row["session_end"], 

216 duration_seconds=row["duration_seconds"], 

217 celery_task_id=row["celery_task_id"], 

218 status=row["status"], 

219 approval_status=row["approval_status"], 

220 notes=row["notes"], 

221 created_at=row["created_at"], 

222 updated_at=row["updated_at"], 

223 source_name=row["source_name"], 

224 source_frequency=row["source_frequency"], 

225 source_latitude=row["source_latitude"], 

226 source_longitude=row["source_longitude"], 

227 measurements_count=row["measurements_count"], 

228 ) 

229 

230 

231@router.post("", response_model=RecordingSession, status_code=201) 

232async def create_session(session: RecordingSessionCreate): 

233 """Create a new recording session""" 

234 pool = await get_pool() 

235 

236 # Verify known source exists 

237 async with pool.acquire() as conn: 

238 source_exists = await conn.fetchval( 

239 "SELECT EXISTS(SELECT 1 FROM heimdall.known_sources WHERE id = $1)", 

240 session.known_source_id 

241 ) 

242 

243 if not source_exists: 

244 raise HTTPException(status_code=404, detail="Known source not found") 

245 

246 # Insert new session 

247 query = """ 

248 INSERT INTO heimdall.recording_sessions  

249 (known_source_id, session_name, session_start, status, approval_status, notes) 

250 VALUES ($1, $2, $3, 'pending', 'pending', $4) 

251 RETURNING  

252 id, known_source_id, session_name, session_start, session_end, 

253 duration_seconds, celery_task_id, status, approval_status, 

254 notes, created_at, updated_at 

255 """ 

256 

257 row = await conn.fetchrow( 

258 query, 

259 session.known_source_id, 

260 session.session_name, 

261 datetime.utcnow(), 

262 session.notes, 

263 ) 

264 

265 return RecordingSession(**dict(row)) 

266 

267 

268@router.patch("/{session_id}/status") 

269async def update_session_status( 

270 session_id: UUID, 

271 status: str, 

272 celery_task_id: Optional[str] = None, 

273): 

274 """Update session status""" 

275 pool = await get_pool() 

276 

277 # Validate status 

278 valid_statuses = ["pending", "in_progress", "completed", "failed"] 

279 if status not in valid_statuses: 

280 raise HTTPException( 

281 status_code=400, 

282 detail=f"Invalid status. Must be one of: {', '.join(valid_statuses)}" 

283 ) 

284 

285 async with pool.acquire() as conn: 

286 # Update session 

287 query = """ 

288 UPDATE heimdall.recording_sessions 

289 SET status = $1,  

290 celery_task_id = COALESCE($2, celery_task_id), 

291 session_end = CASE WHEN $1 IN ('completed', 'failed') THEN NOW() ELSE session_end END, 

292 duration_seconds = CASE WHEN $1 IN ('completed', 'failed')  

293 THEN EXTRACT(EPOCH FROM (NOW() - session_start)) 

294 ELSE duration_seconds END, 

295 updated_at = NOW() 

296 WHERE id = $3 

297 RETURNING  

298 id, known_source_id, session_name, session_start, session_end, 

299 duration_seconds, celery_task_id, status, approval_status, 

300 notes, created_at, updated_at 

301 """ 

302 

303 row = await conn.fetchrow(query, status, celery_task_id, session_id) 

304 

305 if not row: 

306 raise HTTPException(status_code=404, detail="Session not found") 

307 

308 return RecordingSession(**dict(row)) 

309 

310 

311@router.patch("/{session_id}/approval") 

312async def update_session_approval(session_id: UUID, approval_status: str): 

313 """Update session approval status""" 

314 pool = await get_pool() 

315 

316 # Validate approval status 

317 valid_statuses = ["pending", "approved", "rejected"] 

318 if approval_status not in valid_statuses: 

319 raise HTTPException( 

320 status_code=400, 

321 detail=f"Invalid approval status. Must be one of: {', '.join(valid_statuses)}" 

322 ) 

323 

324 async with pool.acquire() as conn: 

325 query = """ 

326 UPDATE heimdall.recording_sessions 

327 SET approval_status = $1, updated_at = NOW() 

328 WHERE id = $2 

329 RETURNING  

330 id, known_source_id, session_name, session_start, session_end, 

331 duration_seconds, celery_task_id, status, approval_status, 

332 notes, created_at, updated_at 

333 """ 

334 

335 row = await conn.fetchrow(query, approval_status, session_id) 

336 

337 if not row: 

338 raise HTTPException(status_code=404, detail="Session not found") 

339 

340 return RecordingSession(**dict(row)) 

341 

342 

343@router.delete("/{session_id}", status_code=204) 

344async def delete_session(session_id: UUID): 

345 """Delete a recording session""" 

346 pool = await get_pool() 

347 

348 async with pool.acquire() as conn: 

349 result = await conn.execute( 

350 "DELETE FROM heimdall.recording_sessions WHERE id = $1", 

351 session_id 

352 ) 

353 

354 if result == "DELETE 0": 

355 raise HTTPException(status_code=404, detail="Session not found") 

356 

357 return None 

358 

359 

360@router.get("/known-sources", response_model=List[KnownSource]) 

361async def list_known_sources(): 

362 """List all known RF sources""" 

363 pool = await get_pool() 

364 

365 query = """ 

366 SELECT id, name, description, frequency_hz, latitude, longitude, 

367 power_dbm, source_type, is_validated, created_at, updated_at 

368 FROM heimdall.known_sources 

369 ORDER BY name 

370 """ 

371 

372 async with pool.acquire() as conn: 

373 rows = await conn.fetch(query) 

374 return [KnownSource(**dict(row)) for row in rows] 

375 

376 

377@router.post("/known-sources", response_model=KnownSource, status_code=201) 

378async def create_known_source(source: KnownSourceCreate): 

379 """Create a new known RF source""" 

380 pool = await get_pool() 

381 

382 query = """ 

383 INSERT INTO heimdall.known_sources  

384 (name, description, frequency_hz, latitude, longitude, power_dbm, source_type, is_validated) 

385 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 

386 RETURNING id, name, description, frequency_hz, latitude, longitude, 

387 power_dbm, source_type, is_validated, created_at, updated_at 

388 """ 

389 

390 async with pool.acquire() as conn: 

391 try: 

392 row = await conn.fetchrow( 

393 query, 

394 source.name, 

395 source.description, 

396 source.frequency_hz, 

397 source.latitude, 

398 source.longitude, 

399 source.power_dbm, 

400 source.source_type, 

401 source.is_validated, 

402 ) 

403 

404 return KnownSource(**dict(row)) 

405 except asyncpg.UniqueViolationError: 

406 raise HTTPException( 

407 status_code=400, 

408 detail="A known source with this name already exists" 

409 )