Coverage for services/rf-acquisition/src/models/db.py: 94%
50 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
1"""SQLAlchemy ORM models for TimescaleDB storage."""
3from datetime import datetime
4from typing import Optional, Dict, Any
5from sqlalchemy import Column, String, Integer, Float, DateTime, Text, BigInteger, Index
6from sqlalchemy.orm import declarative_base
7from sqlalchemy.dialects.postgresql import DOUBLE_PRECISION
9Base = declarative_base()
12class Measurement(Base):
13 """
14 Time-series measurement record optimized for TimescaleDB.
16 Each row represents a measurement from one WebSDR receiver at a specific time.
17 Supports fast queries on frequency, time, and receiver ID.
18 """
20 __tablename__ = "measurements"
22 # Primary key (TimescaleDB hypertable)
23 id = Column(BigInteger, primary_key=True, autoincrement=True)
25 # Acquisition metadata
26 task_id = Column(String(36), nullable=False, index=True)
27 websdr_id = Column(Integer, nullable=False, index=True)
29 # Signal parameters
30 frequency_mhz = Column(DOUBLE_PRECISION, nullable=False)
31 sample_rate_khz = Column(DOUBLE_PRECISION, nullable=False)
32 samples_count = Column(Integer, nullable=False)
34 # Timestamp (TimescaleDB time dimension)
35 timestamp_utc = Column(
36 DateTime(timezone=True),
37 nullable=False,
38 index=True,
39 default=datetime.utcnow
40 )
42 # Computed metrics
43 snr_db = Column(DOUBLE_PRECISION, nullable=True)
44 frequency_offset_hz = Column(DOUBLE_PRECISION, nullable=True)
45 power_dbm = Column(DOUBLE_PRECISION, nullable=True)
47 # Storage reference
48 s3_path = Column(Text, nullable=True)
50 # Compound indexes for common queries
51 __table_args__ = (
52 Index('idx_measurements_websdr_time', 'websdr_id', 'timestamp_utc'),
53 Index('idx_measurements_task_time', 'task_id', 'timestamp_utc'),
54 Index('idx_measurements_frequency', 'frequency_mhz', 'timestamp_utc'),
55 )
57 def __repr__(self) -> str:
58 """String representation."""
59 return (
60 f"<Measurement(id={self.id}, task_id={self.task_id}, "
61 f"websdr_id={self.websdr_id}, snr={self.snr_db}dB, "
62 f"timestamp={self.timestamp_utc})>"
63 )
65 @classmethod
66 def from_measurement_dict(
67 cls,
68 task_id: str,
69 measurement_dict: Dict[str, Any],
70 s3_path: Optional[str] = None,
71 ) -> "Measurement":
72 """
73 Create a Measurement instance from measurement dictionary.
75 Args:
76 task_id: Acquisition task ID
77 measurement_dict: Dictionary containing measurement data
78 Expected keys:
79 - websdr_id (int)
80 - frequency_mhz (float)
81 - sample_rate_khz (float)
82 - samples_count (int)
83 - timestamp_utc (str or datetime)
84 - metrics (dict with snr_db, frequency_offset_hz, power_dbm)
85 s3_path: Optional S3 path where IQ data is stored
87 Returns:
88 Measurement instance
90 Raises:
91 ValueError: If required fields are missing
92 TypeError: If types cannot be converted
93 """
94 try:
95 # Extract and validate required fields
96 websdr_id = int(measurement_dict.get("websdr_id"))
97 frequency_mhz = float(measurement_dict.get("frequency_mhz"))
98 sample_rate_khz = float(measurement_dict.get("sample_rate_khz"))
99 samples_count = int(measurement_dict.get("samples_count"))
101 # Handle timestamp
102 timestamp_str = measurement_dict.get("timestamp_utc")
103 if isinstance(timestamp_str, str):
104 # Parse ISO format datetime
105 if "T" in timestamp_str:
106 timestamp_utc = datetime.fromisoformat(
107 timestamp_str.replace("Z", "+00:00")
108 )
109 else:
110 timestamp_utc = datetime.fromisoformat(timestamp_str)
111 else:
112 timestamp_utc = timestamp_str or datetime.utcnow()
114 # Extract metrics
115 metrics = measurement_dict.get("metrics", {})
116 snr_db = metrics.get("snr_db")
117 frequency_offset_hz = metrics.get("frequency_offset_hz")
118 power_dbm = metrics.get("power_dbm")
120 # Convert to float if present
121 if snr_db is not None:
122 snr_db = float(snr_db)
123 if frequency_offset_hz is not None:
124 frequency_offset_hz = float(frequency_offset_hz)
125 if power_dbm is not None:
126 power_dbm = float(power_dbm)
128 return cls(
129 task_id=task_id,
130 websdr_id=websdr_id,
131 frequency_mhz=frequency_mhz,
132 sample_rate_khz=sample_rate_khz,
133 samples_count=samples_count,
134 timestamp_utc=timestamp_utc,
135 snr_db=snr_db,
136 frequency_offset_hz=frequency_offset_hz,
137 power_dbm=power_dbm,
138 s3_path=s3_path,
139 )
140 except (KeyError, ValueError, TypeError) as e:
141 raise ValueError(
142 f"Failed to create Measurement from dict: {str(e)}"
143 ) from e
145 def to_dict(self) -> Dict[str, Any]:
146 """Convert measurement to dictionary."""
147 return {
148 "id": self.id,
149 "task_id": self.task_id,
150 "websdr_id": self.websdr_id,
151 "frequency_mhz": self.frequency_mhz,
152 "sample_rate_khz": self.sample_rate_khz,
153 "samples_count": self.samples_count,
154 "timestamp_utc": self.timestamp_utc.isoformat() if self.timestamp_utc else None,
155 "snr_db": self.snr_db,
156 "frequency_offset_hz": self.frequency_offset_hz,
157 "power_dbm": self.power_dbm,
158 "s3_path": self.s3_path,
159 }