Coverage for services/rf-acquisition/src/fetchers/websdr_fetcher.py: 71%

101 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-25 16:18 +0000

1"""WebSDR data fetcher for simultaneous IQ acquisition from multiple receivers.""" 

2 

3import asyncio 

4import logging 

5import struct 

6from datetime import datetime 

7from typing import Dict, List, Optional, Tuple 

8import aiohttp 

9import numpy as np 

10 

11from ..models.websdrs import WebSDRConfig, IQDataPoint 

12 

13logger = logging.getLogger(__name__) 

14 

15 

16class WebSDRFetcher: 

17 """Fetch IQ data simultaneously from multiple WebSDR receivers.""" 

18 

19 # WebSDR API endpoint pattern: http://<host>:<port>/iq 

20 # Request parameters: tune=<freq_hz>, rate=<sample_rate_hz> 

21 # Response: binary IQ data (alternating int16 I and Q samples) 

22 

23 def __init__( 

24 self, 

25 websdrs: List[WebSDRConfig], 

26 timeout: int = 30, 

27 retry_count: int = 3, 

28 concurrent_limit: int = 7, 

29 ): 

30 """ 

31 Initialize WebSDR fetcher. 

32  

33 Args: 

34 websdrs: List of WebSDRConfig objects 

35 timeout: Request timeout in seconds 

36 retry_count: Number of retries per request 

37 concurrent_limit: Max concurrent requests 

38 """ 

39 self.websdrs = {ws.id: ws for ws in websdrs if ws.is_active} 

40 self.timeout = aiohttp.ClientTimeout(total=timeout) 

41 self.retry_count = retry_count 

42 self.concurrent_limit = concurrent_limit 

43 self.session: Optional[aiohttp.ClientSession] = None 

44 self.semaphore = asyncio.Semaphore(concurrent_limit) 

45 

46 async def __aenter__(self): 

47 """Context manager entry.""" 

48 connector = aiohttp.TCPConnector( 

49 limit=self.concurrent_limit, 

50 limit_per_host=2, 

51 ttl_dns_cache=300 

52 ) 

53 self.session = aiohttp.ClientSession( 

54 connector=connector, 

55 timeout=self.timeout 

56 ) 

57 return self 

58 

59 async def __aexit__(self, exc_type, exc_val, exc_tb): 

60 """Context manager exit.""" 

61 if self.session: 

62 await self.session.close() 

63 

64 async def fetch_iq_simultaneous( 

65 self, 

66 frequency_mhz: float, 

67 duration_seconds: float, 

68 sample_rate_khz: float = 12.5, 

69 ) -> Dict[int, Tuple[np.ndarray, Optional[str]]]: 

70 """ 

71 Fetch IQ data simultaneously from all active WebSDRs. 

72  

73 Args: 

74 frequency_mhz: Center frequency in MHz 

75 duration_seconds: Duration of acquisition 

76 sample_rate_khz: Sample rate in kHz 

77  

78 Returns: 

79 Dict mapping WebSDR ID to (IQ data array, error message if any) 

80 IQ data is complex64 array with shape (num_samples,) 

81 """ 

82 if not self.session: 

83 raise RuntimeError("Fetcher not in context manager") 

84 

85 # Convert frequencies 

86 frequency_hz = int(frequency_mhz * 1e6) 

87 sample_rate_hz = int(sample_rate_khz * 1e3) 

88 

89 # Calculate expected samples 

90 expected_samples = int(duration_seconds * sample_rate_hz) 

91 

92 logger.info( 

93 "Starting simultaneous fetch from %d WebSDRs", 

94 len(self.websdrs), 

95 extra={ 

96 "frequency_mhz": frequency_mhz, 

97 "duration_seconds": duration_seconds, 

98 "sample_rate_hz": sample_rate_hz, 

99 "expected_samples": expected_samples 

100 } 

101 ) 

102 

103 # Create fetch tasks for all receivers 

104 tasks = [ 

105 self._fetch_from_websdr( 

106 websdr_id, 

107 websdr_cfg, 

108 frequency_hz, 

109 sample_rate_hz, 

110 expected_samples 

111 ) 

112 for websdr_id, websdr_cfg in self.websdrs.items() 

113 ] 

114 

115 # Run all tasks concurrently 

116 results = await asyncio.gather(*tasks, return_exceptions=False) 

117 

118 # Map results back to WebSDR IDs 

119 iq_data_dict = {} 

120 for (websdr_id, _), result in zip( 

121 sorted(self.websdrs.items(), key=lambda x: x[0]), 

122 results 

123 ): 

124 iq_data, error = result 

125 iq_data_dict[websdr_id] = (iq_data, error) 

126 

127 if error: 

128 logger.warning( 

129 "Failed to fetch from WebSDR %d: %s", 

130 websdr_id, 

131 error 

132 ) 

133 else: 

134 logger.info( 

135 "Successfully fetched %d samples from WebSDR %d", 

136 len(iq_data) if iq_data is not None else 0, 

137 websdr_id 

138 ) 

139 

140 return iq_data_dict 

141 

142 async def _fetch_from_websdr( 

143 self, 

144 websdr_id: int, 

145 websdr_cfg: WebSDRConfig, 

146 frequency_hz: int, 

147 sample_rate_hz: int, 

148 expected_samples: int, 

149 ) -> Tuple[Optional[np.ndarray], Optional[str]]: 

150 """ 

151 Fetch IQ data from a single WebSDR with retries. 

152  

153 Returns: 

154 Tuple of (IQ data array or None, error message or None) 

155 """ 

156 async with self.semaphore: 

157 for attempt in range(self.retry_count): 

158 try: 

159 return await self._fetch_single_attempt( 

160 websdr_cfg, 

161 frequency_hz, 

162 sample_rate_hz, 

163 expected_samples 

164 ), None 

165 except Exception as e: 

166 error_msg = str(e) 

167 logger.warning( 

168 "Attempt %d/%d failed for WebSDR %d: %s", 

169 attempt + 1, 

170 self.retry_count, 

171 websdr_id, 

172 error_msg 

173 ) 

174 

175 if attempt < self.retry_count - 1: 

176 # Exponential backoff 

177 wait_time = 2 ** attempt 

178 await asyncio.sleep(wait_time) 

179 

180 return None, f"Failed after {self.retry_count} attempts: {error_msg}" 

181 

182 async def _fetch_single_attempt( 

183 self, 

184 websdr_cfg: WebSDRConfig, 

185 frequency_hz: int, 

186 sample_rate_hz: int, 

187 expected_samples: int, 

188 ) -> np.ndarray: 

189 """ 

190 Single fetch attempt from WebSDR. 

191  

192 Raises: 

193 Exception: If fetch fails 

194 """ 

195 url = f"{str(websdr_cfg.url).rstrip('/')}/iq" 

196 params = { 

197 "tune": frequency_hz, 

198 "rate": sample_rate_hz, 

199 } 

200 

201 async with self.session.get(url, params=params) as response: 

202 if response.status != 200: 

203 raise RuntimeError( 

204 f"HTTP {response.status} from {websdr_cfg.name}" 

205 ) 

206 

207 # Read binary IQ data 

208 data = await response.read() 

209 

210 # WebSDR returns interleaved int16 I and Q samples 

211 # Each sample is 4 bytes (2 bytes I, 2 bytes Q) 

212 iq_samples = struct.unpack(f'<{len(data)//4}h', data[:len(data)//4*4]) 

213 

214 # Reshape to pairs and convert to complex 

215 iq_array = np.array(iq_samples, dtype=np.int16) 

216 

217 # Normalize to [-1, 1] range and convert to complex 

218 iq_normalized = iq_array.astype(np.float32) / 32767.0 

219 iq_complex = iq_normalized[0::2] + 1j * iq_normalized[1::2] 

220 

221 if len(iq_complex) == 0: 

222 raise RuntimeError(f"No IQ data received from {websdr_cfg.name}") 

223 

224 logger.debug( 

225 "Received %d samples from %s", 

226 len(iq_complex), 

227 websdr_cfg.name 

228 ) 

229 

230 return iq_complex.astype(np.complex64) 

231 

232 async def health_check(self) -> Dict[int, bool]: 

233 """ 

234 Check connectivity to all WebSDR receivers. 

235  

236 Returns: 

237 Dict mapping WebSDR ID to True if reachable, False otherwise 

238 """ 

239 if not self.session: 

240 raise RuntimeError("Fetcher not in context manager") 

241 

242 async def check_single(websdr_id: int, websdr_cfg: WebSDRConfig) -> Tuple[int, bool]: 

243 """ 

244 Check if a single WebSDR is online. 

245  

246 Try HEAD first (fast), fall back to GET if HEAD fails with 501. 

247 A WebSDR is considered ONLINE if: 

248 - HEAD request returns 200-299 (success) 

249 - GET request returns 200-299 (success)  

250 - GET request returns 301-399 (redirect - still responding) 

251 - HEAD returns 501 (method not supported - server is responding!) 

252  

253 A WebSDR is OFFLINE if: 

254 - Connection timeout 

255 - DNS resolution fails 

256 - Server returns 4xx/5xx (not 501) 

257 """ 

258 try: 

259 # Use SHORT timeout for health checks - 3 seconds max per receiver 

260 timeout = aiohttp.ClientTimeout(total=3) 

261 

262 # First try HEAD (faster) 

263 logger.debug(f"Health check: HEAD {websdr_cfg.url} (SDR #{websdr_id}, timeout=3s)") 

264 async with self.session.head( 

265 str(websdr_cfg.url), 

266 timeout=timeout, 

267 allow_redirects=False 

268 ) as response: 

269 # 501 = "Method Not Supported" - server is alive but doesn't support HEAD 

270 if response.status == 501: 

271 logger.debug(f" → 501 (HEAD not supported), trying GET... (SDR #{websdr_id})") 

272 # Fall through to try GET 

273 # 200-299 = Success 

274 elif 200 <= response.status < 300: 

275 logger.info(f" → {response.status} OK (SDR #{websdr_id} ONLINE)") 

276 return websdr_id, True 

277 # 300-399 = Redirect (server is responding) 

278 elif 300 <= response.status < 400: 

279 logger.info(f" → {response.status} Redirect (SDR #{websdr_id} ONLINE)") 

280 return websdr_id, True 

281 # 4xx/5xx (except 501) = Error 

282 else: 

283 logger.warning(f" → {response.status} Error (SDR #{websdr_id} appears OFFLINE)") 

284 return websdr_id, False 

285 

286 # If we got here, we got 501, try GET 

287 logger.debug(f"Health check: GET {websdr_cfg.url} (SDR #{websdr_id}, timeout=3s)") 

288 async with self.session.get( 

289 str(websdr_cfg.url), 

290 timeout=timeout, 

291 allow_redirects=False 

292 ) as response: 

293 # Accept 200-399 range as "online" 

294 is_online = 200 <= response.status < 400 

295 status_msg = "ONLINE" if is_online else "OFFLINE" 

296 logger.info(f" → GET {response.status} (SDR #{websdr_id} {status_msg})") 

297 return websdr_id, is_online 

298 

299 except asyncio.TimeoutError: 

300 logger.warning(f"Health check timeout for {websdr_cfg.url} (SDR #{websdr_id}) - marking OFFLINE") 

301 return websdr_id, False 

302 except Exception as e: 

303 logger.warning(f"Health check error for {websdr_cfg.url} (SDR #{websdr_id}): {type(e).__name__}: {e} - marking OFFLINE") 

304 return websdr_id, False 

305 

306 tasks = [ 

307 check_single(wsid, wscfg) 

308 for wsid, wscfg in self.websdrs.items() 

309 ] 

310 

311 results = await asyncio.gather(*tasks, return_exceptions=False) 

312 return dict(results)