Coverage for services/data-ingestion-web/src/repository.py: 0%

65 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-25 16:18 +0000

1"""Repository pattern for database operations""" 

2from datetime import datetime 

3from typing import Optional, List 

4from sqlalchemy.orm import Session 

5from sqlalchemy import desc 

6from .models.session import RecordingSessionORM, RecordingSessionResponse, SessionStatus 

7 

8 

9class SessionRepository: 

10 """Data access layer for recording sessions""" 

11 

12 @staticmethod 

13 def create(db: Session, session_name: str, frequency_mhz: float, duration_seconds: int) -> RecordingSessionORM: 

14 """Create a new recording session""" 

15 session = RecordingSessionORM( 

16 session_name=session_name, 

17 frequency_mhz=frequency_mhz, 

18 duration_seconds=duration_seconds, 

19 status=SessionStatus.PENDING.value, 

20 websdrs_enabled=7, 

21 created_at=datetime.utcnow(), 

22 ) 

23 db.add(session) 

24 db.commit() 

25 db.refresh(session) 

26 return session 

27 

28 @staticmethod 

29 def get_by_id(db: Session, session_id: int) -> Optional[RecordingSessionORM]: 

30 """Fetch session by ID""" 

31 return db.query(RecordingSessionORM).filter(RecordingSessionORM.id == session_id).first() 

32 

33 @staticmethod 

34 def get_by_task_id(db: Session, task_id: str) -> Optional[RecordingSessionORM]: 

35 """Fetch session by Celery task ID""" 

36 return db.query(RecordingSessionORM).filter(RecordingSessionORM.celery_task_id == task_id).first() 

37 

38 @staticmethod 

39 def list_sessions(db: Session, offset: int = 0, limit: int = 20) -> tuple[int, List[RecordingSessionORM]]: 

40 """List all sessions with pagination (newest first)""" 

41 total = db.query(RecordingSessionORM).count() 

42 sessions = ( 

43 db.query(RecordingSessionORM) 

44 .order_by(desc(RecordingSessionORM.created_at)) 

45 .offset(offset) 

46 .limit(limit) 

47 .all() 

48 ) 

49 return total, sessions 

50 

51 @staticmethod 

52 def update_status( 

53 db: Session, 

54 session_id: int, 

55 status: SessionStatus, 

56 celery_task_id: Optional[str] = None, 

57 started_at: Optional[datetime] = None, 

58 ) -> Optional[RecordingSessionORM]: 

59 """Update session status""" 

60 session = db.query(RecordingSessionORM).filter(RecordingSessionORM.id == session_id).first() 

61 if session: 

62 session.status = status.value 

63 if celery_task_id: 

64 session.celery_task_id = celery_task_id 

65 if started_at: 

66 session.started_at = started_at 

67 db.commit() 

68 db.refresh(session) 

69 return session 

70 

71 @staticmethod 

72 def update_completed( 

73 db: Session, 

74 session_id: int, 

75 result_metadata: dict, 

76 minio_path: str, 

77 ) -> Optional[RecordingSessionORM]: 

78 """Mark session as completed with results""" 

79 session = db.query(RecordingSessionORM).filter(RecordingSessionORM.id == session_id).first() 

80 if session: 

81 session.status = SessionStatus.COMPLETED.value 

82 session.result_metadata = result_metadata 

83 session.minio_path = minio_path 

84 session.completed_at = datetime.utcnow() 

85 db.commit() 

86 db.refresh(session) 

87 return session 

88 

89 @staticmethod 

90 def update_failed(db: Session, session_id: int, error_message: str) -> Optional[RecordingSessionORM]: 

91 """Mark session as failed with error""" 

92 session = db.query(RecordingSessionORM).filter(RecordingSessionORM.id == session_id).first() 

93 if session: 

94 session.status = SessionStatus.FAILED.value 

95 session.error_message = error_message 

96 session.completed_at = datetime.utcnow() 

97 db.commit() 

98 db.refresh(session) 

99 return session 

100 

101 @staticmethod 

102 def delete(db: Session, session_id: int) -> bool: 

103 """Delete a session by ID. Returns True if deleted, False if not found""" 

104 session = db.query(RecordingSessionORM).filter(RecordingSessionORM.id == session_id).first() 

105 if session: 

106 db.delete(session) 

107 db.commit() 

108 return True 

109 return False