Coverage for services/training/src/data/features.py: 65%

101 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-25 16:18 +0000

1""" 

2Feature extraction utilities for RF signal processing. 

3 

4Transforms raw IQ data from WebSDR receivers into mel-spectrograms for neural network input. 

5 

6Key functions: 

7- iq_to_mel_spectrogram(): Convert complex IQ samples to mel-scale spectrogram 

8- compute_mfcc(): Compute Mel-Frequency Cepstral Coefficients (optional) 

9- normalize_features(): Zero-mean, unit-variance normalization 

10""" 

11 

12import numpy as np 

13from scipy import signal 

14from scipy.fftpack import dct 

15import structlog 

16from typing import Tuple, Optional 

17 

18logger = structlog.get_logger(__name__) 

19 

20def MEL_SPECTROGRAM_SHAPE(n_mels: int = 128, n_frames: int = 2048) -> Tuple[int, int]: 

21 """Return the shape of the mel-spectrogram feature.""" 

22 return (n_mels, n_frames) 

23 

24def iq_to_mel_spectrogram( 

25 iq_data: np.ndarray, 

26 sample_rate: float = 192000.0, 

27 n_mels: int = 128, 

28 n_fft: int = 2048, 

29 hop_length: int = 512, 

30 f_min: float = 0.0, 

31 f_max: Optional[float] = None, 

32 window: str = 'hann', 

33) -> np.ndarray: 

34 """ 

35 Convert complex IQ data to mel-scale spectrogram. 

36  

37 This is the primary feature extraction function. It converts raw IQ samples 

38 from WebSDR receivers into a mel-scale spectrogram suitable for neural network input. 

39  

40 Args: 

41 iq_data (np.ndarray): Complex IQ samples, shape (n_samples,) or (n_receivers, n_samples) 

42 sample_rate (float): Sampling rate in Hz (default WebSDR: 192 kHz) 

43 n_mels (int): Number of mel-scale frequency bins (default: 128) 

44 n_fft (int): FFT size for STFT (default: 2048) 

45 hop_length (int): Number of samples between successive frames (default: 512) 

46 f_min (float): Minimum frequency in Hz 

47 f_max (Optional[float]): Maximum frequency in Hz (None = Nyquist) 

48 window (str): Window function ('hann', 'hamming', 'blackman', etc.) 

49  

50 Returns: 

51 np.ndarray: Mel-spectrogram, shape (n_mels, n_frames) or (n_receivers, n_mels, n_frames) 

52  

53 Example: 

54 >>> iq_data = np.random.randn(192000) + 1j * np.random.randn(192000) 

55 >>> mel_spec = iq_to_mel_spectrogram(iq_data) 

56 >>> print(mel_spec.shape) # (128, ~375) 

57 """ 

58 

59 if f_max is None: 

60 f_max = sample_rate / 2 # Nyquist frequency 

61 

62 # Ensure complex dtype 

63 iq_data = np.asarray(iq_data, dtype=np.complex128) 

64 

65 # Handle both 1D and 2D inputs 

66 if iq_data.ndim == 1: 

67 # Single channel: convert to magnitude spectrum 

68 magnitude = np.abs(iq_data) 

69 

70 # Compute Short-Time Fourier Transform (STFT) 

71 f, t, stft_result = signal.stft( 

72 magnitude, 

73 fs=sample_rate, 

74 window=window, 

75 nperseg=n_fft, 

76 noverlap=n_fft - hop_length, 

77 nfft=n_fft, 

78 ) 

79 

80 # Convert to power spectrogram 

81 power_spec = np.abs(stft_result) ** 2 

82 

83 # Create mel-scale filter bank 

84 mel_filterbank = librosa_compatible_mel_scale( 

85 sr=sample_rate, 

86 n_fft=n_fft, 

87 n_mels=n_mels, 

88 f_min=f_min, 

89 f_max=f_max, 

90 ) 

91 

92 # Apply mel-scale filter bank 

93 mel_spec = mel_filterbank @ power_spec 

94 

95 # Convert to log scale (add small epsilon to avoid log(0)) 

96 mel_spec_db = 10 * np.log10(mel_spec + 1e-9) 

97 

98 logger.debug( 

99 "iq_to_mel_spectrogram_1d", 

100 input_shape=iq_data.shape, 

101 output_shape=mel_spec_db.shape, 

102 sample_rate=sample_rate, 

103 n_mels=n_mels, 

104 ) 

105 

106 return mel_spec_db 

107 

108 elif iq_data.ndim == 2: 

109 # Multi-channel: process each channel separately and stack 

110 mel_specs = [] 

111 for i, channel in enumerate(iq_data): 

112 mel_spec = iq_to_mel_spectrogram( 

113 channel, 

114 sample_rate=sample_rate, 

115 n_mels=n_mels, 

116 n_fft=n_fft, 

117 hop_length=hop_length, 

118 f_min=f_min, 

119 f_max=f_max, 

120 window=window, 

121 ) 

122 mel_specs.append(mel_spec) 

123 

124 result = np.stack(mel_specs, axis=0) 

125 logger.debug("iq_to_mel_spectrogram_2d", input_shape=iq_data.shape, output_shape=result.shape) 

126 return result 

127 

128 else: 

129 raise ValueError(f"Unsupported input shape: {iq_data.shape}") 

130 

131 

132def librosa_compatible_mel_scale( 

133 sr: float, 

134 n_fft: int, 

135 n_mels: int = 128, 

136 f_min: float = 0.0, 

137 f_max: Optional[float] = None, 

138) -> np.ndarray: 

139 """ 

140 Create mel-scale filter bank compatible with librosa. 

141  

142 Implements triangular mel-scale filters using standard mel-scale frequency spacing. 

143  

144 Args: 

145 sr (float): Sample rate 

146 n_fft (int): FFT size 

147 n_mels (int): Number of mel bins 

148 f_min (float): Minimum frequency 

149 f_max (Optional[float]): Maximum frequency (None = Nyquist) 

150  

151 Returns: 

152 np.ndarray: Mel-scale filter bank, shape (n_mels, n_fft//2 + 1) 

153 """ 

154 if f_max is None: 

155 f_max = sr / 2 

156 

157 # Frequency bins in Hz 

158 freqs = np.fft.rfftfreq(n_fft, d=1/sr) 

159 

160 # Convert Hz to mel scale 

161 def hz_to_mel(hz): 

162 return 2595 * np.log10(1 + hz / 700) 

163 

164 def mel_to_hz(mel): 

165 return 700 * (10 ** (mel / 2595) - 1) 

166 

167 # Mel-scale points 

168 mel_min = hz_to_mel(f_min) 

169 mel_max = hz_to_mel(f_max) 

170 mel_points = np.linspace(mel_min, mel_max, n_mels + 2) 

171 hz_points = mel_to_hz(mel_points) 

172 

173 # Create triangular filter bank 

174 filterbank = np.zeros((n_mels, len(freqs))) 

175 

176 for m in range(n_mels): 

177 f_left = hz_points[m] 

178 f_center = hz_points[m + 1] 

179 f_right = hz_points[m + 2] 

180 

181 # Left side of triangle 

182 left_slope = 1.0 / (f_center - f_left) 

183 left_mask = (freqs >= f_left) & (freqs <= f_center) 

184 filterbank[m, left_mask] = left_slope * (freqs[left_mask] - f_left) 

185 

186 # Right side of triangle 

187 right_slope = 1.0 / (f_right - f_center) 

188 right_mask = (freqs > f_center) & (freqs <= f_right) 

189 filterbank[m, right_mask] = right_slope * (f_right - freqs[right_mask]) 

190 

191 return filterbank 

192 

193 

194def compute_mfcc( 

195 mel_spec_db: np.ndarray, 

196 n_mfcc: int = 13, 

197) -> np.ndarray: 

198 """ 

199 Compute Mel-Frequency Cepstral Coefficients (MFCCs) from mel-spectrogram. 

200  

201 MFCCs are useful for capturing spectral characteristics and are often used 

202 in audio processing. They can be used as an alternative or complementary 

203 feature to raw mel-spectrograms. 

204  

205 Args: 

206 mel_spec_db (np.ndarray): Log mel-spectrogram, shape (n_mels, n_frames) 

207 n_mfcc (int): Number of MFCC coefficients to extract (default: 13) 

208  

209 Returns: 

210 np.ndarray: MFCC coefficients, shape (n_mfcc, n_frames) 

211  

212 Example: 

213 >>> mel_spec = iq_to_mel_spectrogram(iq_data) 

214 >>> mfcc = compute_mfcc(mel_spec, n_mfcc=13) 

215 >>> print(mfcc.shape) # (13, ~375) 

216 """ 

217 # Discrete Cosine Transform (DCT) 

218 mfcc = dct(mel_spec_db, axis=0, type=2, norm='ortho')[:n_mfcc] 

219 

220 logger.debug( 

221 "compute_mfcc", 

222 input_shape=mel_spec_db.shape, 

223 output_shape=mfcc.shape, 

224 n_mfcc=n_mfcc, 

225 ) 

226 

227 return mfcc 

228 

229 

230def normalize_features( 

231 features: np.ndarray, 

232 axis: Optional[int] = None, 

233 epsilon: float = 1e-8, 

234) -> Tuple[np.ndarray, dict]: 

235 """ 

236 Normalize features to zero mean and unit variance. 

237  

238 This is typically applied per-sample (per spectrogram) or per-batch, 

239 and is crucial for neural network training stability. 

240  

241 Args: 

242 features (np.ndarray): Input features, any shape 

243 axis (Optional[int]): Axis over which to compute mean/std 

244 (None = compute over all elements) 

245 epsilon (float): Small value to avoid division by zero 

246  

247 Returns: 

248 Tuple[np.ndarray, dict]: 

249 - Normalized features (same shape as input) 

250 - Dict with statistics: {'mean': mean, 'std': std, 'min': min, 'max': max} 

251  

252 Example: 

253 >>> mel_spec = iq_to_mel_spectrogram(iq_data) 

254 >>> normalized, stats = normalize_features(mel_spec) 

255 """ 

256 

257 mean = np.mean(features, axis=axis, keepdims=True) 

258 std = np.std(features, axis=axis, keepdims=True) 

259 

260 # Avoid division by zero 

261 std = np.maximum(std, epsilon) 

262 

263 normalized = (features - mean) / std 

264 

265 stats = { 

266 'mean': np.squeeze(mean), 

267 'std': np.squeeze(std), 

268 'min': np.min(normalized), 

269 'max': np.max(normalized), 

270 } 

271 

272 logger.debug( 

273 "normalize_features", 

274 input_shape=features.shape, 

275 output_shape=normalized.shape, 

276 stats=stats, 

277 ) 

278 

279 return normalized, stats 

280 

281 

282def augment_iq_data( 

283 iq_data: np.ndarray, 

284 phase_shift_deg: float = 0.0, 

285 amplitude_noise_factor: float = 0.0, 

286) -> np.ndarray: 

287 """ 

288 Apply data augmentation to IQ data (optional enhancement). 

289  

290 This can be useful during training to improve robustness. 

291  

292 Args: 

293 iq_data (np.ndarray): Complex IQ samples 

294 phase_shift_deg (float): Random phase shift in degrees (0 = no augmentation) 

295 amplitude_noise_factor (float): Gaussian noise factor (0 = no augmentation) 

296  

297 Returns: 

298 np.ndarray: Augmented IQ data 

299 """ 

300 

301 augmented = iq_data.copy() 

302 

303 if phase_shift_deg > 0: 

304 phase_shift = np.random.uniform(-phase_shift_deg, phase_shift_deg) 

305 phase_shift_rad = np.radians(phase_shift) 

306 augmented = augmented * np.exp(1j * phase_shift_rad) 

307 

308 if amplitude_noise_factor > 0: 

309 noise = np.random.randn(*iq_data.shape) * amplitude_noise_factor 

310 augmented = augmented + noise * np.exp(1j * np.random.uniform(0, 2 * np.pi)) 

311 

312 logger.debug( 

313 "augment_iq_data", 

314 phase_shift_deg=phase_shift_deg, 

315 amplitude_noise_factor=amplitude_noise_factor, 

316 ) 

317 

318 return augmented 

319 

320 

321def verify_feature_extraction(): 

322 """Verification function for feature extraction pipeline.""" 

323 

324 logger.info("Starting feature extraction verification...") 

325 

326 # Create synthetic IQ data (1 second at 192 kHz) 

327 sample_rate = 192000 

328 duration_sec = 1.0 

329 n_samples = int(sample_rate * duration_sec) 

330 

331 # Synthetic signal: 1 kHz sine wave 

332 t = np.arange(n_samples) / sample_rate 

333 frequency = 1000 # Hz 

334 iq_data = np.exp(2j * np.pi * frequency * t) 

335 

336 # Add noise 

337 iq_data += 0.1 * (np.random.randn(n_samples) + 1j * np.random.randn(n_samples)) 

338 

339 logger.info("iq_data_created", shape=iq_data.shape, dtype=str(iq_data.dtype)) 

340 

341 # Extract features 

342 mel_spec = iq_to_mel_spectrogram(iq_data, sample_rate=sample_rate) 

343 logger.info("mel_spectrogram_extracted", shape=mel_spec.shape) 

344 

345 # Normalize 

346 normalized, stats = normalize_features(mel_spec) 

347 logger.info("features_normalized", stats=stats) 

348 

349 # Verify output shapes 

350 assert mel_spec.shape == (128, 375), f"Expected mel_spec shape (128, 375), got {mel_spec.shape}" 

351 assert normalized.shape == mel_spec.shape, "Normalized shape should match input" 

352 assert abs(np.mean(normalized)) < 0.1, "Normalized mean should be ~0" 

353 assert abs(np.std(normalized) - 1.0) < 0.1, "Normalized std should be ~1" 

354 

355 logger.info("✅ Feature extraction verification passed!") 

356 

357 return mel_spec, normalized 

358 

359 

360if __name__ == "__main__": 

361 import logging 

362 logging.basicConfig(level=logging.INFO) 

363 mel_spec, normalized = verify_feature_extraction()