Coverage for services/training/src/data/features.py: 65%
101 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
1"""
2Feature extraction utilities for RF signal processing.
4Transforms raw IQ data from WebSDR receivers into mel-spectrograms for neural network input.
6Key functions:
7- iq_to_mel_spectrogram(): Convert complex IQ samples to mel-scale spectrogram
8- compute_mfcc(): Compute Mel-Frequency Cepstral Coefficients (optional)
9- normalize_features(): Zero-mean, unit-variance normalization
10"""
12import numpy as np
13from scipy import signal
14from scipy.fftpack import dct
15import structlog
16from typing import Tuple, Optional
18logger = structlog.get_logger(__name__)
20def MEL_SPECTROGRAM_SHAPE(n_mels: int = 128, n_frames: int = 2048) -> Tuple[int, int]:
21 """Return the shape of the mel-spectrogram feature."""
22 return (n_mels, n_frames)
24def iq_to_mel_spectrogram(
25 iq_data: np.ndarray,
26 sample_rate: float = 192000.0,
27 n_mels: int = 128,
28 n_fft: int = 2048,
29 hop_length: int = 512,
30 f_min: float = 0.0,
31 f_max: Optional[float] = None,
32 window: str = 'hann',
33) -> np.ndarray:
34 """
35 Convert complex IQ data to mel-scale spectrogram.
37 This is the primary feature extraction function. It converts raw IQ samples
38 from WebSDR receivers into a mel-scale spectrogram suitable for neural network input.
40 Args:
41 iq_data (np.ndarray): Complex IQ samples, shape (n_samples,) or (n_receivers, n_samples)
42 sample_rate (float): Sampling rate in Hz (default WebSDR: 192 kHz)
43 n_mels (int): Number of mel-scale frequency bins (default: 128)
44 n_fft (int): FFT size for STFT (default: 2048)
45 hop_length (int): Number of samples between successive frames (default: 512)
46 f_min (float): Minimum frequency in Hz
47 f_max (Optional[float]): Maximum frequency in Hz (None = Nyquist)
48 window (str): Window function ('hann', 'hamming', 'blackman', etc.)
50 Returns:
51 np.ndarray: Mel-spectrogram, shape (n_mels, n_frames) or (n_receivers, n_mels, n_frames)
53 Example:
54 >>> iq_data = np.random.randn(192000) + 1j * np.random.randn(192000)
55 >>> mel_spec = iq_to_mel_spectrogram(iq_data)
56 >>> print(mel_spec.shape) # (128, ~375)
57 """
59 if f_max is None:
60 f_max = sample_rate / 2 # Nyquist frequency
62 # Ensure complex dtype
63 iq_data = np.asarray(iq_data, dtype=np.complex128)
65 # Handle both 1D and 2D inputs
66 if iq_data.ndim == 1:
67 # Single channel: convert to magnitude spectrum
68 magnitude = np.abs(iq_data)
70 # Compute Short-Time Fourier Transform (STFT)
71 f, t, stft_result = signal.stft(
72 magnitude,
73 fs=sample_rate,
74 window=window,
75 nperseg=n_fft,
76 noverlap=n_fft - hop_length,
77 nfft=n_fft,
78 )
80 # Convert to power spectrogram
81 power_spec = np.abs(stft_result) ** 2
83 # Create mel-scale filter bank
84 mel_filterbank = librosa_compatible_mel_scale(
85 sr=sample_rate,
86 n_fft=n_fft,
87 n_mels=n_mels,
88 f_min=f_min,
89 f_max=f_max,
90 )
92 # Apply mel-scale filter bank
93 mel_spec = mel_filterbank @ power_spec
95 # Convert to log scale (add small epsilon to avoid log(0))
96 mel_spec_db = 10 * np.log10(mel_spec + 1e-9)
98 logger.debug(
99 "iq_to_mel_spectrogram_1d",
100 input_shape=iq_data.shape,
101 output_shape=mel_spec_db.shape,
102 sample_rate=sample_rate,
103 n_mels=n_mels,
104 )
106 return mel_spec_db
108 elif iq_data.ndim == 2:
109 # Multi-channel: process each channel separately and stack
110 mel_specs = []
111 for i, channel in enumerate(iq_data):
112 mel_spec = iq_to_mel_spectrogram(
113 channel,
114 sample_rate=sample_rate,
115 n_mels=n_mels,
116 n_fft=n_fft,
117 hop_length=hop_length,
118 f_min=f_min,
119 f_max=f_max,
120 window=window,
121 )
122 mel_specs.append(mel_spec)
124 result = np.stack(mel_specs, axis=0)
125 logger.debug("iq_to_mel_spectrogram_2d", input_shape=iq_data.shape, output_shape=result.shape)
126 return result
128 else:
129 raise ValueError(f"Unsupported input shape: {iq_data.shape}")
132def librosa_compatible_mel_scale(
133 sr: float,
134 n_fft: int,
135 n_mels: int = 128,
136 f_min: float = 0.0,
137 f_max: Optional[float] = None,
138) -> np.ndarray:
139 """
140 Create mel-scale filter bank compatible with librosa.
142 Implements triangular mel-scale filters using standard mel-scale frequency spacing.
144 Args:
145 sr (float): Sample rate
146 n_fft (int): FFT size
147 n_mels (int): Number of mel bins
148 f_min (float): Minimum frequency
149 f_max (Optional[float]): Maximum frequency (None = Nyquist)
151 Returns:
152 np.ndarray: Mel-scale filter bank, shape (n_mels, n_fft//2 + 1)
153 """
154 if f_max is None:
155 f_max = sr / 2
157 # Frequency bins in Hz
158 freqs = np.fft.rfftfreq(n_fft, d=1/sr)
160 # Convert Hz to mel scale
161 def hz_to_mel(hz):
162 return 2595 * np.log10(1 + hz / 700)
164 def mel_to_hz(mel):
165 return 700 * (10 ** (mel / 2595) - 1)
167 # Mel-scale points
168 mel_min = hz_to_mel(f_min)
169 mel_max = hz_to_mel(f_max)
170 mel_points = np.linspace(mel_min, mel_max, n_mels + 2)
171 hz_points = mel_to_hz(mel_points)
173 # Create triangular filter bank
174 filterbank = np.zeros((n_mels, len(freqs)))
176 for m in range(n_mels):
177 f_left = hz_points[m]
178 f_center = hz_points[m + 1]
179 f_right = hz_points[m + 2]
181 # Left side of triangle
182 left_slope = 1.0 / (f_center - f_left)
183 left_mask = (freqs >= f_left) & (freqs <= f_center)
184 filterbank[m, left_mask] = left_slope * (freqs[left_mask] - f_left)
186 # Right side of triangle
187 right_slope = 1.0 / (f_right - f_center)
188 right_mask = (freqs > f_center) & (freqs <= f_right)
189 filterbank[m, right_mask] = right_slope * (f_right - freqs[right_mask])
191 return filterbank
194def compute_mfcc(
195 mel_spec_db: np.ndarray,
196 n_mfcc: int = 13,
197) -> np.ndarray:
198 """
199 Compute Mel-Frequency Cepstral Coefficients (MFCCs) from mel-spectrogram.
201 MFCCs are useful for capturing spectral characteristics and are often used
202 in audio processing. They can be used as an alternative or complementary
203 feature to raw mel-spectrograms.
205 Args:
206 mel_spec_db (np.ndarray): Log mel-spectrogram, shape (n_mels, n_frames)
207 n_mfcc (int): Number of MFCC coefficients to extract (default: 13)
209 Returns:
210 np.ndarray: MFCC coefficients, shape (n_mfcc, n_frames)
212 Example:
213 >>> mel_spec = iq_to_mel_spectrogram(iq_data)
214 >>> mfcc = compute_mfcc(mel_spec, n_mfcc=13)
215 >>> print(mfcc.shape) # (13, ~375)
216 """
217 # Discrete Cosine Transform (DCT)
218 mfcc = dct(mel_spec_db, axis=0, type=2, norm='ortho')[:n_mfcc]
220 logger.debug(
221 "compute_mfcc",
222 input_shape=mel_spec_db.shape,
223 output_shape=mfcc.shape,
224 n_mfcc=n_mfcc,
225 )
227 return mfcc
230def normalize_features(
231 features: np.ndarray,
232 axis: Optional[int] = None,
233 epsilon: float = 1e-8,
234) -> Tuple[np.ndarray, dict]:
235 """
236 Normalize features to zero mean and unit variance.
238 This is typically applied per-sample (per spectrogram) or per-batch,
239 and is crucial for neural network training stability.
241 Args:
242 features (np.ndarray): Input features, any shape
243 axis (Optional[int]): Axis over which to compute mean/std
244 (None = compute over all elements)
245 epsilon (float): Small value to avoid division by zero
247 Returns:
248 Tuple[np.ndarray, dict]:
249 - Normalized features (same shape as input)
250 - Dict with statistics: {'mean': mean, 'std': std, 'min': min, 'max': max}
252 Example:
253 >>> mel_spec = iq_to_mel_spectrogram(iq_data)
254 >>> normalized, stats = normalize_features(mel_spec)
255 """
257 mean = np.mean(features, axis=axis, keepdims=True)
258 std = np.std(features, axis=axis, keepdims=True)
260 # Avoid division by zero
261 std = np.maximum(std, epsilon)
263 normalized = (features - mean) / std
265 stats = {
266 'mean': np.squeeze(mean),
267 'std': np.squeeze(std),
268 'min': np.min(normalized),
269 'max': np.max(normalized),
270 }
272 logger.debug(
273 "normalize_features",
274 input_shape=features.shape,
275 output_shape=normalized.shape,
276 stats=stats,
277 )
279 return normalized, stats
282def augment_iq_data(
283 iq_data: np.ndarray,
284 phase_shift_deg: float = 0.0,
285 amplitude_noise_factor: float = 0.0,
286) -> np.ndarray:
287 """
288 Apply data augmentation to IQ data (optional enhancement).
290 This can be useful during training to improve robustness.
292 Args:
293 iq_data (np.ndarray): Complex IQ samples
294 phase_shift_deg (float): Random phase shift in degrees (0 = no augmentation)
295 amplitude_noise_factor (float): Gaussian noise factor (0 = no augmentation)
297 Returns:
298 np.ndarray: Augmented IQ data
299 """
301 augmented = iq_data.copy()
303 if phase_shift_deg > 0:
304 phase_shift = np.random.uniform(-phase_shift_deg, phase_shift_deg)
305 phase_shift_rad = np.radians(phase_shift)
306 augmented = augmented * np.exp(1j * phase_shift_rad)
308 if amplitude_noise_factor > 0:
309 noise = np.random.randn(*iq_data.shape) * amplitude_noise_factor
310 augmented = augmented + noise * np.exp(1j * np.random.uniform(0, 2 * np.pi))
312 logger.debug(
313 "augment_iq_data",
314 phase_shift_deg=phase_shift_deg,
315 amplitude_noise_factor=amplitude_noise_factor,
316 )
318 return augmented
321def verify_feature_extraction():
322 """Verification function for feature extraction pipeline."""
324 logger.info("Starting feature extraction verification...")
326 # Create synthetic IQ data (1 second at 192 kHz)
327 sample_rate = 192000
328 duration_sec = 1.0
329 n_samples = int(sample_rate * duration_sec)
331 # Synthetic signal: 1 kHz sine wave
332 t = np.arange(n_samples) / sample_rate
333 frequency = 1000 # Hz
334 iq_data = np.exp(2j * np.pi * frequency * t)
336 # Add noise
337 iq_data += 0.1 * (np.random.randn(n_samples) + 1j * np.random.randn(n_samples))
339 logger.info("iq_data_created", shape=iq_data.shape, dtype=str(iq_data.dtype))
341 # Extract features
342 mel_spec = iq_to_mel_spectrogram(iq_data, sample_rate=sample_rate)
343 logger.info("mel_spectrogram_extracted", shape=mel_spec.shape)
345 # Normalize
346 normalized, stats = normalize_features(mel_spec)
347 logger.info("features_normalized", stats=stats)
349 # Verify output shapes
350 assert mel_spec.shape == (128, 375), f"Expected mel_spec shape (128, 375), got {mel_spec.shape}"
351 assert normalized.shape == mel_spec.shape, "Normalized shape should match input"
352 assert abs(np.mean(normalized)) < 0.1, "Normalized mean should be ~0"
353 assert abs(np.std(normalized) - 1.0) < 0.1, "Normalized std should be ~1"
355 logger.info("✅ Feature extraction verification passed!")
357 return mel_spec, normalized
360if __name__ == "__main__":
361 import logging
362 logging.basicConfig(level=logging.INFO)
363 mel_spec, normalized = verify_feature_extraction()