Coverage for services/inference/src/utils/preprocessing.py: 30%

122 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-25 16:18 +0000

1"""IQ data preprocessing pipeline for Phase 6 Inference Service. 

2 

3Converts raw IQ data (time-domain) to mel-spectrogram features suitable for 

4neural network inference. Matches training pipeline from Phase 5. 

5""" 

6 

7import numpy as np 

8import logging 

9from typing import List, Tuple, Optional 

10from dataclasses import dataclass 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15@dataclass 

16class PreprocessingConfig: 

17 """Configuration for preprocessing pipeline.""" 

18 

19 # FFT parameters 

20 n_fft: int = 512 

21 hop_length: int = 128 

22 

23 # Mel-spectrogram parameters 

24 n_mels: int = 128 

25 f_min: float = 0.0 

26 f_max: float = 0.5 # Normalized frequency (Nyquist = 0.5) 

27 power: float = 2.0 

28 

29 # Normalization 

30 normalize: bool = True 

31 norm_mean: Optional[float] = None 

32 norm_std: Optional[float] = None 

33 

34 def __post_init__(self): 

35 """Validate configuration.""" 

36 if self.n_fft <= 0: 

37 raise ValueError(f"n_fft must be positive, got {self.n_fft}") 

38 if self.n_mels <= 0: 

39 raise ValueError(f"n_mels must be positive, got {self.n_mels}") 

40 if not (0 <= self.f_min < self.f_max <= 0.5): 

41 raise ValueError(f"Invalid frequency range: f_min={self.f_min}, f_max={self.f_max}") 

42 

43 

44class IQPreprocessor: 

45 """ 

46 Preprocessing pipeline for IQ data. 

47  

48 Converts: 

49 - Input: IQ samples (shape: (2, N) or [(I, Q), ...]) 

50 - Output: Mel-spectrogram (shape: (n_mels, time_steps)) 

51  

52 Pipeline: 

53 1. Convert to complex IQ representation 

54 2. Compute power spectrogram (magnitude squared) 

55 3. Convert to mel scale 

56 4. Apply logarithmic scaling 

57 5. Normalize (optional) 

58 """ 

59 

60 def __init__(self, config: Optional[PreprocessingConfig] = None): 

61 """ 

62 Initialize preprocessor. 

63  

64 Args: 

65 config: PreprocessingConfig instance. Defaults to standard config. 

66 """ 

67 self.config = config or PreprocessingConfig() 

68 self._mel_fb = None # Cached mel filterbank 

69 logger.info(f"IQPreprocessor initialized with config: {self.config}") 

70 

71 def preprocess(self, iq_data: List[List[float]]) -> np.ndarray: 

72 """ 

73 Preprocess raw IQ data to mel-spectrogram. 

74  

75 Args: 

76 iq_data: List of [I, Q] samples. Shape: (N, 2) where N is number of samples. 

77 Can also be 2D array: [[I1, Q1], [I2, Q2], ...] 

78  

79 Returns: 

80 Mel-spectrogram: np.ndarray of shape (n_mels, time_steps) 

81  

82 Raises: 

83 ValueError: If input is invalid 

84 RuntimeError: If preprocessing fails 

85 """ 

86 try: 

87 # Step 1: Convert to complex IQ 

88 iq_complex = self._to_complex_iq(iq_data) 

89 logger.debug(f"IQ shape: {iq_complex.shape}, dtype: {iq_complex.dtype}") 

90 

91 # Step 2: Compute power spectrogram using FFT 

92 spectrogram = self._compute_spectrogram(iq_complex) 

93 logger.debug(f"Spectrogram shape: {spectrogram.shape}") 

94 

95 # Step 3: Convert to mel scale 

96 mel_spec = self._to_mel_scale(spectrogram) 

97 logger.debug(f"Mel-spectrogram shape: {mel_spec.shape}") 

98 

99 # Step 4: Apply log scaling 

100 mel_spec_log = self._apply_log_scale(mel_spec) 

101 

102 # Step 5: Normalize if configured 

103 if self.config.normalize: 

104 mel_spec_log = self._normalize(mel_spec_log) 

105 

106 return mel_spec_log 

107 

108 except Exception as e: 

109 logger.error(f"Preprocessing failed: {e}", exc_info=True) 

110 raise RuntimeError(f"IQ preprocessing error: {e}") from e 

111 

112 def _to_complex_iq(self, iq_data: List[List[float]]) -> np.ndarray: 

113 """ 

114 Convert IQ samples to complex representation. 

115  

116 Args: 

117 iq_data: List of [I, Q] pairs or (N, 2) array 

118  

119 Returns: 

120 Complex array: shape (N,) 

121 """ 

122 # Convert to numpy array 

123 iq_array = np.array(iq_data, dtype=np.float32) 

124 

125 # Validate shape 

126 if len(iq_array.shape) != 2 or iq_array.shape[1] != 2: 

127 raise ValueError( 

128 f"Expected (N, 2) array, got shape {iq_array.shape}. " 

129 f"Input should be list of [I, Q] pairs." 

130 ) 

131 

132 if iq_array.shape[0] < self.config.n_fft: 

133 raise ValueError( 

134 f"Not enough samples: {iq_array.shape[0]} < n_fft={self.config.n_fft}. " 

135 f"Need at least {self.config.n_fft} samples." 

136 ) 

137 

138 # Extract I and Q, convert to complex 

139 I = iq_array[:, 0] # In-phase 

140 Q = iq_array[:, 1] # Quadrature 

141 

142 # Complex: I + 1j*Q 

143 iq_complex = I + 1j * Q 

144 

145 logger.debug(f"Converted {len(iq_complex)} IQ samples to complex") 

146 return iq_complex 

147 

148 def _compute_spectrogram(self, iq_complex: np.ndarray) -> np.ndarray: 

149 """ 

150 Compute power spectrogram via STFT. 

151  

152 Args: 

153 iq_complex: Complex IQ signal, shape (N,) 

154  

155 Returns: 

156 Power spectrogram: shape (n_fft//2 + 1, time_steps) 

157 """ 

158 # Compute STFT 

159 # Window: Hann window by default 

160 window = np.hanning(self.config.n_fft) 

161 

162 # Compute STFT manually via sliding windows 

163 n_frames = (len(iq_complex) - self.config.n_fft) // self.config.hop_length + 1 

164 spectrogram = np.zeros( 

165 (self.config.n_fft // 2 + 1, n_frames), 

166 dtype=np.float32 

167 ) 

168 

169 for i in range(n_frames): 

170 start = i * self.config.hop_length 

171 end = start + self.config.n_fft 

172 

173 # Extract frame and apply window 

174 frame = iq_complex[start:end] * window 

175 

176 # Compute FFT 

177 fft = np.fft.fft(frame) 

178 

179 # Compute power (magnitude squared) 

180 magnitude = np.abs(fft[:self.config.n_fft // 2 + 1]) 

181 power = (magnitude ** 2) / self.config.n_fft 

182 

183 spectrogram[:, i] = power 

184 

185 logger.debug(f"Computed STFT: {n_frames} frames of {self.config.n_fft} samples") 

186 return spectrogram 

187 

188 def _to_mel_scale(self, spectrogram: np.ndarray) -> np.ndarray: 

189 """ 

190 Convert power spectrogram to mel scale. 

191  

192 Args: 

193 spectrogram: Linear spectrogram, shape (n_fft//2 + 1, time_steps) 

194  

195 Returns: 

196 Mel-spectrogram: shape (n_mels, time_steps) 

197 """ 

198 # Build mel filterbank if not cached 

199 if self._mel_fb is None: 

200 self._mel_fb = self._build_mel_filterbank() 

201 

202 # Apply mel filterbank: (n_mels, n_fft//2+1) @ (n_fft//2+1, time_steps) 

203 mel_spec = np.dot(self._mel_fb, spectrogram) 

204 

205 logger.debug(f"Converted to mel scale: {mel_spec.shape}") 

206 return mel_spec 

207 

208 def _build_mel_filterbank(self) -> np.ndarray: 

209 """ 

210 Build mel filterbank matrix. 

211  

212 Returns: 

213 Filterbank: shape (n_mels, n_fft//2 + 1) 

214 """ 

215 # Nyquist frequency 

216 nyquist = self.config.f_max 

217 

218 # Convert frequency range to FFT bins 

219 n_fft_bins = self.config.n_fft // 2 + 1 

220 f_min_bin = int(np.ceil(self.config.f_min * n_fft_bins)) 

221 f_max_bin = int(np.floor(self.config.f_max * n_fft_bins)) 

222 

223 # Create mel-spaced frequencies 

224 mel_points = np.linspace( 

225 self._hz_to_mel(self.config.f_min), 

226 self._hz_to_mel(self.config.f_max), 

227 self.config.n_mels + 2 

228 ) 

229 freq_points = np.array([self._mel_to_hz(m) for m in mel_points]) 

230 

231 # Convert to FFT bins 

232 bin_points = np.array([ 

233 int(np.floor(f * n_fft_bins)) for f in freq_points 

234 ]) 

235 

236 # Build triangular filterbank 

237 filterbank = np.zeros((self.config.n_mels, n_fft_bins)) 

238 

239 for m in range(self.config.n_mels): 

240 left = bin_points[m] 

241 center = bin_points[m + 1] 

242 right = bin_points[m + 2] 

243 

244 # Left slope 

245 if center > left: 

246 filterbank[m, left:center] = ( 

247 np.arange(center - left) / (center - left) 

248 ) 

249 

250 # Right slope 

251 if right > center: 

252 filterbank[m, center:right] = ( 

253 np.arange(right - center, 0, -1) / (right - center) 

254 ) 

255 

256 logger.debug(f"Built mel filterbank: {filterbank.shape}") 

257 return filterbank 

258 

259 @staticmethod 

260 def _hz_to_mel(hz: float) -> float: 

261 """Convert Hz to mel scale.""" 

262 return 2595 * np.log10(1 + hz / 700) 

263 

264 @staticmethod 

265 def _mel_to_hz(mel: float) -> float: 

266 """Convert mel to Hz.""" 

267 return 700 * (10 ** (mel / 2595) - 1) 

268 

269 def _apply_log_scale(self, mel_spec: np.ndarray, epsilon: float = 1e-10) -> np.ndarray: 

270 """ 

271 Apply logarithmic scaling. 

272  

273 Args: 

274 mel_spec: Mel-spectrogram 

275 epsilon: Small value to avoid log(0) 

276  

277 Returns: 

278 Log-scaled mel-spectrogram 

279 """ 

280 mel_spec_log = np.log(mel_spec + epsilon) 

281 logger.debug(f"Applied log scaling: min={mel_spec_log.min():.3f}, max={mel_spec_log.max():.3f}") 

282 return mel_spec_log 

283 

284 def _normalize(self, mel_spec_log: np.ndarray) -> np.ndarray: 

285 """ 

286 Normalize mel-spectrogram. 

287  

288 Args: 

289 mel_spec_log: Log-scaled mel-spectrogram 

290  

291 Returns: 

292 Normalized mel-spectrogram (zero mean, unit variance) 

293 """ 

294 # Use provided mean/std or compute from data 

295 if self.config.norm_mean is not None and self.config.norm_std is not None: 

296 mean = self.config.norm_mean 

297 std = self.config.norm_std 

298 else: 

299 mean = mel_spec_log.mean() 

300 std = mel_spec_log.std() 

301 

302 if std == 0: 

303 logger.warning("Standard deviation is zero, skipping normalization") 

304 return mel_spec_log 

305 

306 normalized = (mel_spec_log - mean) / std 

307 logger.debug( 

308 f"Normalized: mean={normalized.mean():.6f}, std={normalized.std():.6f}" 

309 ) 

310 return normalized 

311 

312 def get_config_dict(self) -> dict: 

313 """Return configuration as dictionary for metadata.""" 

314 return { 

315 'n_fft': self.config.n_fft, 

316 'hop_length': self.config.hop_length, 

317 'n_mels': self.config.n_mels, 

318 'f_min': self.config.f_min, 

319 'f_max': self.config.f_max, 

320 'power': self.config.power, 

321 'normalize': self.config.normalize, 

322 } 

323 

324 

325def preprocess_iq_data(iq_data: List[List[float]]) -> Tuple[np.ndarray, dict]: 

326 """ 

327 Convenience function to preprocess IQ data with default configuration. 

328  

329 Args: 

330 iq_data: List of [I, Q] samples 

331  

332 Returns: 

333 Tuple of (mel_spectrogram, metadata_dict) 

334 """ 

335 preprocessor = IQPreprocessor() 

336 mel_spec = preprocessor.preprocess(iq_data) 

337 metadata = { 

338 'shape': mel_spec.shape, 

339 'dtype': str(mel_spec.dtype), 

340 'min': float(mel_spec.min()), 

341 'max': float(mel_spec.max()), 

342 'mean': float(mel_spec.mean()), 

343 'std': float(mel_spec.std()), 

344 } 

345 return mel_spec, metadata