Coverage for services/rf-acquisition/src/processors/iq_processor.py: 79%
72 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
1"""IQ data processing and signal metrics computation."""
3import logging
4from typing import Tuple
5import numpy as np
6from scipy import signal
7from scipy.fft import fft, fftfreq
9from ..models.websdrs import SignalMetrics
11logger = logging.getLogger(__name__)
14class IQProcessor:
15 """Process IQ data and compute signal metrics."""
17 @staticmethod
18 def compute_metrics(
19 iq_data: np.ndarray,
20 sample_rate_hz: int,
21 target_frequency_hz: int,
22 noise_bandwidth_hz: int = 10000,
23 ) -> SignalMetrics:
24 """
25 Compute signal metrics from IQ data.
27 Args:
28 iq_data: Complex64 IQ data array
29 sample_rate_hz: Sample rate in Hz
30 target_frequency_hz: Target frequency in Hz
31 noise_bandwidth_hz: Noise measurement bandwidth in Hz
33 Returns:
34 SignalMetrics object with computed metrics
35 """
36 if len(iq_data) == 0:
37 raise ValueError("Empty IQ data")
39 # Normalize IQ data
40 iq_normalized = iq_data / (np.max(np.abs(iq_data)) + 1e-10)
42 # Compute power spectrum
43 psd_db, freqs = IQProcessor._compute_psd(iq_normalized, sample_rate_hz)
45 # Find frequency offset
46 frequency_offset_hz = IQProcessor._estimate_frequency_offset(
47 iq_normalized,
48 sample_rate_hz,
49 target_frequency_hz
50 )
52 # Compute SNR
53 signal_power_db, noise_power_db = IQProcessor._compute_snr(
54 psd_db,
55 freqs,
56 target_frequency_hz,
57 noise_bandwidth_hz
58 )
60 snr_db = signal_power_db - noise_power_db
62 # Average PSD at center frequency
63 center_idx = np.argmin(np.abs(freqs - 0)) # Baseband center
64 psd_dbm = psd_db[center_idx]
66 logger.debug(
67 "Computed metrics - SNR: %.2f dB, PSD: %.2f dBm, Freq Offset: %.2f Hz",
68 snr_db,
69 psd_dbm,
70 frequency_offset_hz
71 )
73 return SignalMetrics(
74 snr_db=float(snr_db),
75 psd_dbm=float(psd_dbm),
76 frequency_offset_hz=float(frequency_offset_hz),
77 signal_power_dbm=float(signal_power_db),
78 noise_power_dbm=float(noise_power_db)
79 )
81 @staticmethod
82 def _compute_psd(
83 iq_data: np.ndarray,
84 sample_rate_hz: int,
85 nperseg: int = 1024,
86 ) -> Tuple[np.ndarray, np.ndarray]:
87 """
88 Compute Power Spectral Density using Welch's method.
90 Returns:
91 Tuple of (PSD in dB, frequencies)
92 """
93 # Use Welch's method for stable PSD estimate
94 freqs, psd = signal.welch(
95 iq_data,
96 fs=sample_rate_hz,
97 nperseg=min(nperseg, len(iq_data)),
98 scaling='density',
99 window='hann'
100 )
102 # Convert to dB (reference 1 Watt)
103 psd_db = 10 * np.log10(psd + 1e-12)
105 return psd_db, freqs
107 @staticmethod
108 def _estimate_frequency_offset(
109 iq_data: np.ndarray,
110 sample_rate_hz: int,
111 target_frequency_hz: int,
112 ) -> float:
113 """
114 Estimate frequency offset using Phase Locked Loop (PLL) technique.
116 Returns:
117 Frequency offset in Hz
118 """
119 # Simple frequency offset estimation using FFT
120 # Find peak in power spectrum
121 n = len(iq_data)
122 fft_result = fft(iq_data, n=2**int(np.ceil(np.log2(n))))
123 freqs = fftfreq(len(fft_result), 1/sample_rate_hz)
125 # Only consider frequencies near baseband (within ±sample_rate/4)
126 valid_range = sample_rate_hz / 4
127 mask = np.abs(freqs) <= valid_range
129 power_spectrum = np.abs(fft_result[mask]) ** 2
130 freqs_masked = freqs[mask]
132 if np.max(power_spectrum) == 0:
133 return 0.0
135 # Find peak frequency
136 peak_idx = np.argmax(power_spectrum)
137 estimated_offset = freqs_masked[peak_idx]
139 return float(estimated_offset)
141 @staticmethod
142 def _compute_snr(
143 psd_db: np.ndarray,
144 freqs: np.ndarray,
145 target_frequency_hz: int,
146 noise_bandwidth_hz: int = 10000,
147 ) -> Tuple[float, float]:
148 """
149 Compute Signal and Noise power from PSD.
151 Returns:
152 Tuple of (signal_power_db, noise_power_db)
153 """
154 # Signal region: center ± noise_bandwidth / 2
155 signal_mask = np.abs(freqs - 0) <= (noise_bandwidth_hz / 2)
156 signal_power_db = float(np.mean(psd_db[signal_mask]))
158 # Noise region: use edges of spectrum
159 edge_width = int(0.1 * len(psd_db))
160 noise_region = np.concatenate([
161 psd_db[:edge_width],
162 psd_db[-edge_width:]
163 ])
164 noise_power_db = float(np.mean(noise_region))
166 return signal_power_db, noise_power_db
168 @staticmethod
169 def save_iq_data_hdf5(
170 iq_data: np.ndarray,
171 filename: str,
172 metadata: dict = None,
173 ):
174 """
175 Save IQ data to HDF5 file with metadata.
177 Args:
178 iq_data: Complex64 IQ data array
179 filename: Output HDF5 filename
180 metadata: Optional metadata dictionary
181 """
182 try:
183 import h5py
184 except ImportError:
185 logger.warning("h5py not installed, skipping HDF5 save")
186 return
188 with h5py.File(filename, 'w') as f:
189 # Save IQ data as separate I and Q arrays
190 f.create_dataset('I', data=np.real(iq_data), dtype=np.float32)
191 f.create_dataset('Q', data=np.imag(iq_data), dtype=np.float32)
193 # Save metadata
194 if metadata:
195 meta_group = f.create_group('metadata')
196 for key, value in metadata.items():
197 if isinstance(value, (str, int, float, bool)):
198 meta_group.attrs[key] = value
200 logger.info("Saved IQ data to %s", filename)
202 @staticmethod
203 def save_iq_data_npy(
204 iq_data: np.ndarray,
205 filename: str,
206 metadata: dict = None,
207 ):
208 """
209 Save IQ data to NPY file with optional metadata JSON.
211 Args:
212 iq_data: Complex64 IQ data array
213 filename: Output NPY filename (without extension)
214 metadata: Optional metadata dictionary
215 """
216 import json
218 # Save IQ data
219 np.save(f"{filename}.npy", iq_data.astype(np.complex64))
221 # Save metadata as JSON
222 if metadata:
223 with open(f"{filename}_meta.json", 'w') as f:
224 json.dump(metadata, f, indent=2, default=str)
226 logger.info("Saved IQ data to %s.npy", filename)