Coverage for services/rf-acquisition/src/processors/iq_processor.py: 79%

72 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-25 16:18 +0000

1"""IQ data processing and signal metrics computation.""" 

2 

3import logging 

4from typing import Tuple 

5import numpy as np 

6from scipy import signal 

7from scipy.fft import fft, fftfreq 

8 

9from ..models.websdrs import SignalMetrics 

10 

11logger = logging.getLogger(__name__) 

12 

13 

14class IQProcessor: 

15 """Process IQ data and compute signal metrics.""" 

16 

17 @staticmethod 

18 def compute_metrics( 

19 iq_data: np.ndarray, 

20 sample_rate_hz: int, 

21 target_frequency_hz: int, 

22 noise_bandwidth_hz: int = 10000, 

23 ) -> SignalMetrics: 

24 """ 

25 Compute signal metrics from IQ data. 

26  

27 Args: 

28 iq_data: Complex64 IQ data array 

29 sample_rate_hz: Sample rate in Hz 

30 target_frequency_hz: Target frequency in Hz 

31 noise_bandwidth_hz: Noise measurement bandwidth in Hz 

32  

33 Returns: 

34 SignalMetrics object with computed metrics 

35 """ 

36 if len(iq_data) == 0: 

37 raise ValueError("Empty IQ data") 

38 

39 # Normalize IQ data 

40 iq_normalized = iq_data / (np.max(np.abs(iq_data)) + 1e-10) 

41 

42 # Compute power spectrum 

43 psd_db, freqs = IQProcessor._compute_psd(iq_normalized, sample_rate_hz) 

44 

45 # Find frequency offset 

46 frequency_offset_hz = IQProcessor._estimate_frequency_offset( 

47 iq_normalized, 

48 sample_rate_hz, 

49 target_frequency_hz 

50 ) 

51 

52 # Compute SNR 

53 signal_power_db, noise_power_db = IQProcessor._compute_snr( 

54 psd_db, 

55 freqs, 

56 target_frequency_hz, 

57 noise_bandwidth_hz 

58 ) 

59 

60 snr_db = signal_power_db - noise_power_db 

61 

62 # Average PSD at center frequency 

63 center_idx = np.argmin(np.abs(freqs - 0)) # Baseband center 

64 psd_dbm = psd_db[center_idx] 

65 

66 logger.debug( 

67 "Computed metrics - SNR: %.2f dB, PSD: %.2f dBm, Freq Offset: %.2f Hz", 

68 snr_db, 

69 psd_dbm, 

70 frequency_offset_hz 

71 ) 

72 

73 return SignalMetrics( 

74 snr_db=float(snr_db), 

75 psd_dbm=float(psd_dbm), 

76 frequency_offset_hz=float(frequency_offset_hz), 

77 signal_power_dbm=float(signal_power_db), 

78 noise_power_dbm=float(noise_power_db) 

79 ) 

80 

81 @staticmethod 

82 def _compute_psd( 

83 iq_data: np.ndarray, 

84 sample_rate_hz: int, 

85 nperseg: int = 1024, 

86 ) -> Tuple[np.ndarray, np.ndarray]: 

87 """ 

88 Compute Power Spectral Density using Welch's method. 

89  

90 Returns: 

91 Tuple of (PSD in dB, frequencies) 

92 """ 

93 # Use Welch's method for stable PSD estimate 

94 freqs, psd = signal.welch( 

95 iq_data, 

96 fs=sample_rate_hz, 

97 nperseg=min(nperseg, len(iq_data)), 

98 scaling='density', 

99 window='hann' 

100 ) 

101 

102 # Convert to dB (reference 1 Watt) 

103 psd_db = 10 * np.log10(psd + 1e-12) 

104 

105 return psd_db, freqs 

106 

107 @staticmethod 

108 def _estimate_frequency_offset( 

109 iq_data: np.ndarray, 

110 sample_rate_hz: int, 

111 target_frequency_hz: int, 

112 ) -> float: 

113 """ 

114 Estimate frequency offset using Phase Locked Loop (PLL) technique. 

115  

116 Returns: 

117 Frequency offset in Hz 

118 """ 

119 # Simple frequency offset estimation using FFT 

120 # Find peak in power spectrum 

121 n = len(iq_data) 

122 fft_result = fft(iq_data, n=2**int(np.ceil(np.log2(n)))) 

123 freqs = fftfreq(len(fft_result), 1/sample_rate_hz) 

124 

125 # Only consider frequencies near baseband (within ±sample_rate/4) 

126 valid_range = sample_rate_hz / 4 

127 mask = np.abs(freqs) <= valid_range 

128 

129 power_spectrum = np.abs(fft_result[mask]) ** 2 

130 freqs_masked = freqs[mask] 

131 

132 if np.max(power_spectrum) == 0: 

133 return 0.0 

134 

135 # Find peak frequency 

136 peak_idx = np.argmax(power_spectrum) 

137 estimated_offset = freqs_masked[peak_idx] 

138 

139 return float(estimated_offset) 

140 

141 @staticmethod 

142 def _compute_snr( 

143 psd_db: np.ndarray, 

144 freqs: np.ndarray, 

145 target_frequency_hz: int, 

146 noise_bandwidth_hz: int = 10000, 

147 ) -> Tuple[float, float]: 

148 """ 

149 Compute Signal and Noise power from PSD. 

150  

151 Returns: 

152 Tuple of (signal_power_db, noise_power_db) 

153 """ 

154 # Signal region: center ± noise_bandwidth / 2 

155 signal_mask = np.abs(freqs - 0) <= (noise_bandwidth_hz / 2) 

156 signal_power_db = float(np.mean(psd_db[signal_mask])) 

157 

158 # Noise region: use edges of spectrum 

159 edge_width = int(0.1 * len(psd_db)) 

160 noise_region = np.concatenate([ 

161 psd_db[:edge_width], 

162 psd_db[-edge_width:] 

163 ]) 

164 noise_power_db = float(np.mean(noise_region)) 

165 

166 return signal_power_db, noise_power_db 

167 

168 @staticmethod 

169 def save_iq_data_hdf5( 

170 iq_data: np.ndarray, 

171 filename: str, 

172 metadata: dict = None, 

173 ): 

174 """ 

175 Save IQ data to HDF5 file with metadata. 

176  

177 Args: 

178 iq_data: Complex64 IQ data array 

179 filename: Output HDF5 filename 

180 metadata: Optional metadata dictionary 

181 """ 

182 try: 

183 import h5py 

184 except ImportError: 

185 logger.warning("h5py not installed, skipping HDF5 save") 

186 return 

187 

188 with h5py.File(filename, 'w') as f: 

189 # Save IQ data as separate I and Q arrays 

190 f.create_dataset('I', data=np.real(iq_data), dtype=np.float32) 

191 f.create_dataset('Q', data=np.imag(iq_data), dtype=np.float32) 

192 

193 # Save metadata 

194 if metadata: 

195 meta_group = f.create_group('metadata') 

196 for key, value in metadata.items(): 

197 if isinstance(value, (str, int, float, bool)): 

198 meta_group.attrs[key] = value 

199 

200 logger.info("Saved IQ data to %s", filename) 

201 

202 @staticmethod 

203 def save_iq_data_npy( 

204 iq_data: np.ndarray, 

205 filename: str, 

206 metadata: dict = None, 

207 ): 

208 """ 

209 Save IQ data to NPY file with optional metadata JSON. 

210  

211 Args: 

212 iq_data: Complex64 IQ data array 

213 filename: Output NPY filename (without extension) 

214 metadata: Optional metadata dictionary 

215 """ 

216 import json 

217 

218 # Save IQ data 

219 np.save(f"{filename}.npy", iq_data.astype(np.complex64)) 

220 

221 # Save metadata as JSON 

222 if metadata: 

223 with open(f"{filename}_meta.json", 'w') as f: 

224 json.dump(metadata, f, indent=2, default=str) 

225 

226 logger.info("Saved IQ data to %s.npy", filename)