Coverage for services/data-ingestion-web/src/repository.py: 0%
65 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
1"""Repository pattern for database operations"""
2from datetime import datetime
3from typing import Optional, List
4from sqlalchemy.orm import Session
5from sqlalchemy import desc
6from .models.session import RecordingSessionORM, RecordingSessionResponse, SessionStatus
9class SessionRepository:
10 """Data access layer for recording sessions"""
12 @staticmethod
13 def create(db: Session, session_name: str, frequency_mhz: float, duration_seconds: int) -> RecordingSessionORM:
14 """Create a new recording session"""
15 session = RecordingSessionORM(
16 session_name=session_name,
17 frequency_mhz=frequency_mhz,
18 duration_seconds=duration_seconds,
19 status=SessionStatus.PENDING.value,
20 websdrs_enabled=7,
21 created_at=datetime.utcnow(),
22 )
23 db.add(session)
24 db.commit()
25 db.refresh(session)
26 return session
28 @staticmethod
29 def get_by_id(db: Session, session_id: int) -> Optional[RecordingSessionORM]:
30 """Fetch session by ID"""
31 return db.query(RecordingSessionORM).filter(RecordingSessionORM.id == session_id).first()
33 @staticmethod
34 def get_by_task_id(db: Session, task_id: str) -> Optional[RecordingSessionORM]:
35 """Fetch session by Celery task ID"""
36 return db.query(RecordingSessionORM).filter(RecordingSessionORM.celery_task_id == task_id).first()
38 @staticmethod
39 def list_sessions(db: Session, offset: int = 0, limit: int = 20) -> tuple[int, List[RecordingSessionORM]]:
40 """List all sessions with pagination (newest first)"""
41 total = db.query(RecordingSessionORM).count()
42 sessions = (
43 db.query(RecordingSessionORM)
44 .order_by(desc(RecordingSessionORM.created_at))
45 .offset(offset)
46 .limit(limit)
47 .all()
48 )
49 return total, sessions
51 @staticmethod
52 def update_status(
53 db: Session,
54 session_id: int,
55 status: SessionStatus,
56 celery_task_id: Optional[str] = None,
57 started_at: Optional[datetime] = None,
58 ) -> Optional[RecordingSessionORM]:
59 """Update session status"""
60 session = db.query(RecordingSessionORM).filter(RecordingSessionORM.id == session_id).first()
61 if session:
62 session.status = status.value
63 if celery_task_id:
64 session.celery_task_id = celery_task_id
65 if started_at:
66 session.started_at = started_at
67 db.commit()
68 db.refresh(session)
69 return session
71 @staticmethod
72 def update_completed(
73 db: Session,
74 session_id: int,
75 result_metadata: dict,
76 minio_path: str,
77 ) -> Optional[RecordingSessionORM]:
78 """Mark session as completed with results"""
79 session = db.query(RecordingSessionORM).filter(RecordingSessionORM.id == session_id).first()
80 if session:
81 session.status = SessionStatus.COMPLETED.value
82 session.result_metadata = result_metadata
83 session.minio_path = minio_path
84 session.completed_at = datetime.utcnow()
85 db.commit()
86 db.refresh(session)
87 return session
89 @staticmethod
90 def update_failed(db: Session, session_id: int, error_message: str) -> Optional[RecordingSessionORM]:
91 """Mark session as failed with error"""
92 session = db.query(RecordingSessionORM).filter(RecordingSessionORM.id == session_id).first()
93 if session:
94 session.status = SessionStatus.FAILED.value
95 session.error_message = error_message
96 session.completed_at = datetime.utcnow()
97 db.commit()
98 db.refresh(session)
99 return session
101 @staticmethod
102 def delete(db: Session, session_id: int) -> bool:
103 """Delete a session by ID. Returns True if deleted, False if not found"""
104 session = db.query(RecordingSessionORM).filter(RecordingSessionORM.id == session_id).first()
105 if session:
106 db.delete(session)
107 db.commit()
108 return True
109 return False