Coverage for services/rf-acquisition/src/models/db.py: 94%

50 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-25 16:18 +0000

1"""SQLAlchemy ORM models for TimescaleDB storage.""" 

2 

3from datetime import datetime 

4from typing import Optional, Dict, Any 

5from sqlalchemy import Column, String, Integer, Float, DateTime, Text, BigInteger, Index 

6from sqlalchemy.orm import declarative_base 

7from sqlalchemy.dialects.postgresql import DOUBLE_PRECISION 

8 

9Base = declarative_base() 

10 

11 

12class Measurement(Base): 

13 """ 

14 Time-series measurement record optimized for TimescaleDB. 

15  

16 Each row represents a measurement from one WebSDR receiver at a specific time. 

17 Supports fast queries on frequency, time, and receiver ID. 

18 """ 

19 

20 __tablename__ = "measurements" 

21 

22 # Primary key (TimescaleDB hypertable) 

23 id = Column(BigInteger, primary_key=True, autoincrement=True) 

24 

25 # Acquisition metadata 

26 task_id = Column(String(36), nullable=False, index=True) 

27 websdr_id = Column(Integer, nullable=False, index=True) 

28 

29 # Signal parameters 

30 frequency_mhz = Column(DOUBLE_PRECISION, nullable=False) 

31 sample_rate_khz = Column(DOUBLE_PRECISION, nullable=False) 

32 samples_count = Column(Integer, nullable=False) 

33 

34 # Timestamp (TimescaleDB time dimension) 

35 timestamp_utc = Column( 

36 DateTime(timezone=True), 

37 nullable=False, 

38 index=True, 

39 default=datetime.utcnow 

40 ) 

41 

42 # Computed metrics 

43 snr_db = Column(DOUBLE_PRECISION, nullable=True) 

44 frequency_offset_hz = Column(DOUBLE_PRECISION, nullable=True) 

45 power_dbm = Column(DOUBLE_PRECISION, nullable=True) 

46 

47 # Storage reference 

48 s3_path = Column(Text, nullable=True) 

49 

50 # Compound indexes for common queries 

51 __table_args__ = ( 

52 Index('idx_measurements_websdr_time', 'websdr_id', 'timestamp_utc'), 

53 Index('idx_measurements_task_time', 'task_id', 'timestamp_utc'), 

54 Index('idx_measurements_frequency', 'frequency_mhz', 'timestamp_utc'), 

55 ) 

56 

57 def __repr__(self) -> str: 

58 """String representation.""" 

59 return ( 

60 f"<Measurement(id={self.id}, task_id={self.task_id}, " 

61 f"websdr_id={self.websdr_id}, snr={self.snr_db}dB, " 

62 f"timestamp={self.timestamp_utc})>" 

63 ) 

64 

65 @classmethod 

66 def from_measurement_dict( 

67 cls, 

68 task_id: str, 

69 measurement_dict: Dict[str, Any], 

70 s3_path: Optional[str] = None, 

71 ) -> "Measurement": 

72 """ 

73 Create a Measurement instance from measurement dictionary. 

74  

75 Args: 

76 task_id: Acquisition task ID 

77 measurement_dict: Dictionary containing measurement data 

78 Expected keys: 

79 - websdr_id (int) 

80 - frequency_mhz (float) 

81 - sample_rate_khz (float) 

82 - samples_count (int) 

83 - timestamp_utc (str or datetime) 

84 - metrics (dict with snr_db, frequency_offset_hz, power_dbm) 

85 s3_path: Optional S3 path where IQ data is stored 

86  

87 Returns: 

88 Measurement instance 

89  

90 Raises: 

91 ValueError: If required fields are missing 

92 TypeError: If types cannot be converted 

93 """ 

94 try: 

95 # Extract and validate required fields 

96 websdr_id = int(measurement_dict.get("websdr_id")) 

97 frequency_mhz = float(measurement_dict.get("frequency_mhz")) 

98 sample_rate_khz = float(measurement_dict.get("sample_rate_khz")) 

99 samples_count = int(measurement_dict.get("samples_count")) 

100 

101 # Handle timestamp 

102 timestamp_str = measurement_dict.get("timestamp_utc") 

103 if isinstance(timestamp_str, str): 

104 # Parse ISO format datetime 

105 if "T" in timestamp_str: 

106 timestamp_utc = datetime.fromisoformat( 

107 timestamp_str.replace("Z", "+00:00") 

108 ) 

109 else: 

110 timestamp_utc = datetime.fromisoformat(timestamp_str) 

111 else: 

112 timestamp_utc = timestamp_str or datetime.utcnow() 

113 

114 # Extract metrics 

115 metrics = measurement_dict.get("metrics", {}) 

116 snr_db = metrics.get("snr_db") 

117 frequency_offset_hz = metrics.get("frequency_offset_hz") 

118 power_dbm = metrics.get("power_dbm") 

119 

120 # Convert to float if present 

121 if snr_db is not None: 

122 snr_db = float(snr_db) 

123 if frequency_offset_hz is not None: 

124 frequency_offset_hz = float(frequency_offset_hz) 

125 if power_dbm is not None: 

126 power_dbm = float(power_dbm) 

127 

128 return cls( 

129 task_id=task_id, 

130 websdr_id=websdr_id, 

131 frequency_mhz=frequency_mhz, 

132 sample_rate_khz=sample_rate_khz, 

133 samples_count=samples_count, 

134 timestamp_utc=timestamp_utc, 

135 snr_db=snr_db, 

136 frequency_offset_hz=frequency_offset_hz, 

137 power_dbm=power_dbm, 

138 s3_path=s3_path, 

139 ) 

140 except (KeyError, ValueError, TypeError) as e: 

141 raise ValueError( 

142 f"Failed to create Measurement from dict: {str(e)}" 

143 ) from e 

144 

145 def to_dict(self) -> Dict[str, Any]: 

146 """Convert measurement to dictionary.""" 

147 return { 

148 "id": self.id, 

149 "task_id": self.task_id, 

150 "websdr_id": self.websdr_id, 

151 "frequency_mhz": self.frequency_mhz, 

152 "sample_rate_khz": self.sample_rate_khz, 

153 "samples_count": self.samples_count, 

154 "timestamp_utc": self.timestamp_utc.isoformat() if self.timestamp_utc else None, 

155 "snr_db": self.snr_db, 

156 "frequency_offset_hz": self.frequency_offset_hz, 

157 "power_dbm": self.power_dbm, 

158 "s3_path": self.s3_path, 

159 }