Coverage for services/inference/src/utils/preprocessing.py: 30%
122 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
1"""IQ data preprocessing pipeline for Phase 6 Inference Service.
3Converts raw IQ data (time-domain) to mel-spectrogram features suitable for
4neural network inference. Matches training pipeline from Phase 5.
5"""
7import numpy as np
8import logging
9from typing import List, Tuple, Optional
10from dataclasses import dataclass
12logger = logging.getLogger(__name__)
15@dataclass
16class PreprocessingConfig:
17 """Configuration for preprocessing pipeline."""
19 # FFT parameters
20 n_fft: int = 512
21 hop_length: int = 128
23 # Mel-spectrogram parameters
24 n_mels: int = 128
25 f_min: float = 0.0
26 f_max: float = 0.5 # Normalized frequency (Nyquist = 0.5)
27 power: float = 2.0
29 # Normalization
30 normalize: bool = True
31 norm_mean: Optional[float] = None
32 norm_std: Optional[float] = None
34 def __post_init__(self):
35 """Validate configuration."""
36 if self.n_fft <= 0:
37 raise ValueError(f"n_fft must be positive, got {self.n_fft}")
38 if self.n_mels <= 0:
39 raise ValueError(f"n_mels must be positive, got {self.n_mels}")
40 if not (0 <= self.f_min < self.f_max <= 0.5):
41 raise ValueError(f"Invalid frequency range: f_min={self.f_min}, f_max={self.f_max}")
44class IQPreprocessor:
45 """
46 Preprocessing pipeline for IQ data.
48 Converts:
49 - Input: IQ samples (shape: (2, N) or [(I, Q), ...])
50 - Output: Mel-spectrogram (shape: (n_mels, time_steps))
52 Pipeline:
53 1. Convert to complex IQ representation
54 2. Compute power spectrogram (magnitude squared)
55 3. Convert to mel scale
56 4. Apply logarithmic scaling
57 5. Normalize (optional)
58 """
60 def __init__(self, config: Optional[PreprocessingConfig] = None):
61 """
62 Initialize preprocessor.
64 Args:
65 config: PreprocessingConfig instance. Defaults to standard config.
66 """
67 self.config = config or PreprocessingConfig()
68 self._mel_fb = None # Cached mel filterbank
69 logger.info(f"IQPreprocessor initialized with config: {self.config}")
71 def preprocess(self, iq_data: List[List[float]]) -> np.ndarray:
72 """
73 Preprocess raw IQ data to mel-spectrogram.
75 Args:
76 iq_data: List of [I, Q] samples. Shape: (N, 2) where N is number of samples.
77 Can also be 2D array: [[I1, Q1], [I2, Q2], ...]
79 Returns:
80 Mel-spectrogram: np.ndarray of shape (n_mels, time_steps)
82 Raises:
83 ValueError: If input is invalid
84 RuntimeError: If preprocessing fails
85 """
86 try:
87 # Step 1: Convert to complex IQ
88 iq_complex = self._to_complex_iq(iq_data)
89 logger.debug(f"IQ shape: {iq_complex.shape}, dtype: {iq_complex.dtype}")
91 # Step 2: Compute power spectrogram using FFT
92 spectrogram = self._compute_spectrogram(iq_complex)
93 logger.debug(f"Spectrogram shape: {spectrogram.shape}")
95 # Step 3: Convert to mel scale
96 mel_spec = self._to_mel_scale(spectrogram)
97 logger.debug(f"Mel-spectrogram shape: {mel_spec.shape}")
99 # Step 4: Apply log scaling
100 mel_spec_log = self._apply_log_scale(mel_spec)
102 # Step 5: Normalize if configured
103 if self.config.normalize:
104 mel_spec_log = self._normalize(mel_spec_log)
106 return mel_spec_log
108 except Exception as e:
109 logger.error(f"Preprocessing failed: {e}", exc_info=True)
110 raise RuntimeError(f"IQ preprocessing error: {e}") from e
112 def _to_complex_iq(self, iq_data: List[List[float]]) -> np.ndarray:
113 """
114 Convert IQ samples to complex representation.
116 Args:
117 iq_data: List of [I, Q] pairs or (N, 2) array
119 Returns:
120 Complex array: shape (N,)
121 """
122 # Convert to numpy array
123 iq_array = np.array(iq_data, dtype=np.float32)
125 # Validate shape
126 if len(iq_array.shape) != 2 or iq_array.shape[1] != 2:
127 raise ValueError(
128 f"Expected (N, 2) array, got shape {iq_array.shape}. "
129 f"Input should be list of [I, Q] pairs."
130 )
132 if iq_array.shape[0] < self.config.n_fft:
133 raise ValueError(
134 f"Not enough samples: {iq_array.shape[0]} < n_fft={self.config.n_fft}. "
135 f"Need at least {self.config.n_fft} samples."
136 )
138 # Extract I and Q, convert to complex
139 I = iq_array[:, 0] # In-phase
140 Q = iq_array[:, 1] # Quadrature
142 # Complex: I + 1j*Q
143 iq_complex = I + 1j * Q
145 logger.debug(f"Converted {len(iq_complex)} IQ samples to complex")
146 return iq_complex
148 def _compute_spectrogram(self, iq_complex: np.ndarray) -> np.ndarray:
149 """
150 Compute power spectrogram via STFT.
152 Args:
153 iq_complex: Complex IQ signal, shape (N,)
155 Returns:
156 Power spectrogram: shape (n_fft//2 + 1, time_steps)
157 """
158 # Compute STFT
159 # Window: Hann window by default
160 window = np.hanning(self.config.n_fft)
162 # Compute STFT manually via sliding windows
163 n_frames = (len(iq_complex) - self.config.n_fft) // self.config.hop_length + 1
164 spectrogram = np.zeros(
165 (self.config.n_fft // 2 + 1, n_frames),
166 dtype=np.float32
167 )
169 for i in range(n_frames):
170 start = i * self.config.hop_length
171 end = start + self.config.n_fft
173 # Extract frame and apply window
174 frame = iq_complex[start:end] * window
176 # Compute FFT
177 fft = np.fft.fft(frame)
179 # Compute power (magnitude squared)
180 magnitude = np.abs(fft[:self.config.n_fft // 2 + 1])
181 power = (magnitude ** 2) / self.config.n_fft
183 spectrogram[:, i] = power
185 logger.debug(f"Computed STFT: {n_frames} frames of {self.config.n_fft} samples")
186 return spectrogram
188 def _to_mel_scale(self, spectrogram: np.ndarray) -> np.ndarray:
189 """
190 Convert power spectrogram to mel scale.
192 Args:
193 spectrogram: Linear spectrogram, shape (n_fft//2 + 1, time_steps)
195 Returns:
196 Mel-spectrogram: shape (n_mels, time_steps)
197 """
198 # Build mel filterbank if not cached
199 if self._mel_fb is None:
200 self._mel_fb = self._build_mel_filterbank()
202 # Apply mel filterbank: (n_mels, n_fft//2+1) @ (n_fft//2+1, time_steps)
203 mel_spec = np.dot(self._mel_fb, spectrogram)
205 logger.debug(f"Converted to mel scale: {mel_spec.shape}")
206 return mel_spec
208 def _build_mel_filterbank(self) -> np.ndarray:
209 """
210 Build mel filterbank matrix.
212 Returns:
213 Filterbank: shape (n_mels, n_fft//2 + 1)
214 """
215 # Nyquist frequency
216 nyquist = self.config.f_max
218 # Convert frequency range to FFT bins
219 n_fft_bins = self.config.n_fft // 2 + 1
220 f_min_bin = int(np.ceil(self.config.f_min * n_fft_bins))
221 f_max_bin = int(np.floor(self.config.f_max * n_fft_bins))
223 # Create mel-spaced frequencies
224 mel_points = np.linspace(
225 self._hz_to_mel(self.config.f_min),
226 self._hz_to_mel(self.config.f_max),
227 self.config.n_mels + 2
228 )
229 freq_points = np.array([self._mel_to_hz(m) for m in mel_points])
231 # Convert to FFT bins
232 bin_points = np.array([
233 int(np.floor(f * n_fft_bins)) for f in freq_points
234 ])
236 # Build triangular filterbank
237 filterbank = np.zeros((self.config.n_mels, n_fft_bins))
239 for m in range(self.config.n_mels):
240 left = bin_points[m]
241 center = bin_points[m + 1]
242 right = bin_points[m + 2]
244 # Left slope
245 if center > left:
246 filterbank[m, left:center] = (
247 np.arange(center - left) / (center - left)
248 )
250 # Right slope
251 if right > center:
252 filterbank[m, center:right] = (
253 np.arange(right - center, 0, -1) / (right - center)
254 )
256 logger.debug(f"Built mel filterbank: {filterbank.shape}")
257 return filterbank
259 @staticmethod
260 def _hz_to_mel(hz: float) -> float:
261 """Convert Hz to mel scale."""
262 return 2595 * np.log10(1 + hz / 700)
264 @staticmethod
265 def _mel_to_hz(mel: float) -> float:
266 """Convert mel to Hz."""
267 return 700 * (10 ** (mel / 2595) - 1)
269 def _apply_log_scale(self, mel_spec: np.ndarray, epsilon: float = 1e-10) -> np.ndarray:
270 """
271 Apply logarithmic scaling.
273 Args:
274 mel_spec: Mel-spectrogram
275 epsilon: Small value to avoid log(0)
277 Returns:
278 Log-scaled mel-spectrogram
279 """
280 mel_spec_log = np.log(mel_spec + epsilon)
281 logger.debug(f"Applied log scaling: min={mel_spec_log.min():.3f}, max={mel_spec_log.max():.3f}")
282 return mel_spec_log
284 def _normalize(self, mel_spec_log: np.ndarray) -> np.ndarray:
285 """
286 Normalize mel-spectrogram.
288 Args:
289 mel_spec_log: Log-scaled mel-spectrogram
291 Returns:
292 Normalized mel-spectrogram (zero mean, unit variance)
293 """
294 # Use provided mean/std or compute from data
295 if self.config.norm_mean is not None and self.config.norm_std is not None:
296 mean = self.config.norm_mean
297 std = self.config.norm_std
298 else:
299 mean = mel_spec_log.mean()
300 std = mel_spec_log.std()
302 if std == 0:
303 logger.warning("Standard deviation is zero, skipping normalization")
304 return mel_spec_log
306 normalized = (mel_spec_log - mean) / std
307 logger.debug(
308 f"Normalized: mean={normalized.mean():.6f}, std={normalized.std():.6f}"
309 )
310 return normalized
312 def get_config_dict(self) -> dict:
313 """Return configuration as dictionary for metadata."""
314 return {
315 'n_fft': self.config.n_fft,
316 'hop_length': self.config.hop_length,
317 'n_mels': self.config.n_mels,
318 'f_min': self.config.f_min,
319 'f_max': self.config.f_max,
320 'power': self.config.power,
321 'normalize': self.config.normalize,
322 }
325def preprocess_iq_data(iq_data: List[List[float]]) -> Tuple[np.ndarray, dict]:
326 """
327 Convenience function to preprocess IQ data with default configuration.
329 Args:
330 iq_data: List of [I, Q] samples
332 Returns:
333 Tuple of (mel_spectrogram, metadata_dict)
334 """
335 preprocessor = IQPreprocessor()
336 mel_spec = preprocessor.preprocess(iq_data)
337 metadata = {
338 'shape': mel_spec.shape,
339 'dtype': str(mel_spec.dtype),
340 'min': float(mel_spec.min()),
341 'max': float(mel_spec.max()),
342 'mean': float(mel_spec.mean()),
343 'std': float(mel_spec.std()),
344 }
345 return mel_spec, metadata