Coverage for services/rf-acquisition/src/fetchers/websdr_fetcher.py: 71%
101 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
1"""WebSDR data fetcher for simultaneous IQ acquisition from multiple receivers."""
3import asyncio
4import logging
5import struct
6from datetime import datetime
7from typing import Dict, List, Optional, Tuple
8import aiohttp
9import numpy as np
11from ..models.websdrs import WebSDRConfig, IQDataPoint
13logger = logging.getLogger(__name__)
16class WebSDRFetcher:
17 """Fetch IQ data simultaneously from multiple WebSDR receivers."""
19 # WebSDR API endpoint pattern: http://<host>:<port>/iq
20 # Request parameters: tune=<freq_hz>, rate=<sample_rate_hz>
21 # Response: binary IQ data (alternating int16 I and Q samples)
23 def __init__(
24 self,
25 websdrs: List[WebSDRConfig],
26 timeout: int = 30,
27 retry_count: int = 3,
28 concurrent_limit: int = 7,
29 ):
30 """
31 Initialize WebSDR fetcher.
33 Args:
34 websdrs: List of WebSDRConfig objects
35 timeout: Request timeout in seconds
36 retry_count: Number of retries per request
37 concurrent_limit: Max concurrent requests
38 """
39 self.websdrs = {ws.id: ws for ws in websdrs if ws.is_active}
40 self.timeout = aiohttp.ClientTimeout(total=timeout)
41 self.retry_count = retry_count
42 self.concurrent_limit = concurrent_limit
43 self.session: Optional[aiohttp.ClientSession] = None
44 self.semaphore = asyncio.Semaphore(concurrent_limit)
46 async def __aenter__(self):
47 """Context manager entry."""
48 connector = aiohttp.TCPConnector(
49 limit=self.concurrent_limit,
50 limit_per_host=2,
51 ttl_dns_cache=300
52 )
53 self.session = aiohttp.ClientSession(
54 connector=connector,
55 timeout=self.timeout
56 )
57 return self
59 async def __aexit__(self, exc_type, exc_val, exc_tb):
60 """Context manager exit."""
61 if self.session:
62 await self.session.close()
64 async def fetch_iq_simultaneous(
65 self,
66 frequency_mhz: float,
67 duration_seconds: float,
68 sample_rate_khz: float = 12.5,
69 ) -> Dict[int, Tuple[np.ndarray, Optional[str]]]:
70 """
71 Fetch IQ data simultaneously from all active WebSDRs.
73 Args:
74 frequency_mhz: Center frequency in MHz
75 duration_seconds: Duration of acquisition
76 sample_rate_khz: Sample rate in kHz
78 Returns:
79 Dict mapping WebSDR ID to (IQ data array, error message if any)
80 IQ data is complex64 array with shape (num_samples,)
81 """
82 if not self.session:
83 raise RuntimeError("Fetcher not in context manager")
85 # Convert frequencies
86 frequency_hz = int(frequency_mhz * 1e6)
87 sample_rate_hz = int(sample_rate_khz * 1e3)
89 # Calculate expected samples
90 expected_samples = int(duration_seconds * sample_rate_hz)
92 logger.info(
93 "Starting simultaneous fetch from %d WebSDRs",
94 len(self.websdrs),
95 extra={
96 "frequency_mhz": frequency_mhz,
97 "duration_seconds": duration_seconds,
98 "sample_rate_hz": sample_rate_hz,
99 "expected_samples": expected_samples
100 }
101 )
103 # Create fetch tasks for all receivers
104 tasks = [
105 self._fetch_from_websdr(
106 websdr_id,
107 websdr_cfg,
108 frequency_hz,
109 sample_rate_hz,
110 expected_samples
111 )
112 for websdr_id, websdr_cfg in self.websdrs.items()
113 ]
115 # Run all tasks concurrently
116 results = await asyncio.gather(*tasks, return_exceptions=False)
118 # Map results back to WebSDR IDs
119 iq_data_dict = {}
120 for (websdr_id, _), result in zip(
121 sorted(self.websdrs.items(), key=lambda x: x[0]),
122 results
123 ):
124 iq_data, error = result
125 iq_data_dict[websdr_id] = (iq_data, error)
127 if error:
128 logger.warning(
129 "Failed to fetch from WebSDR %d: %s",
130 websdr_id,
131 error
132 )
133 else:
134 logger.info(
135 "Successfully fetched %d samples from WebSDR %d",
136 len(iq_data) if iq_data is not None else 0,
137 websdr_id
138 )
140 return iq_data_dict
142 async def _fetch_from_websdr(
143 self,
144 websdr_id: int,
145 websdr_cfg: WebSDRConfig,
146 frequency_hz: int,
147 sample_rate_hz: int,
148 expected_samples: int,
149 ) -> Tuple[Optional[np.ndarray], Optional[str]]:
150 """
151 Fetch IQ data from a single WebSDR with retries.
153 Returns:
154 Tuple of (IQ data array or None, error message or None)
155 """
156 async with self.semaphore:
157 for attempt in range(self.retry_count):
158 try:
159 return await self._fetch_single_attempt(
160 websdr_cfg,
161 frequency_hz,
162 sample_rate_hz,
163 expected_samples
164 ), None
165 except Exception as e:
166 error_msg = str(e)
167 logger.warning(
168 "Attempt %d/%d failed for WebSDR %d: %s",
169 attempt + 1,
170 self.retry_count,
171 websdr_id,
172 error_msg
173 )
175 if attempt < self.retry_count - 1:
176 # Exponential backoff
177 wait_time = 2 ** attempt
178 await asyncio.sleep(wait_time)
180 return None, f"Failed after {self.retry_count} attempts: {error_msg}"
182 async def _fetch_single_attempt(
183 self,
184 websdr_cfg: WebSDRConfig,
185 frequency_hz: int,
186 sample_rate_hz: int,
187 expected_samples: int,
188 ) -> np.ndarray:
189 """
190 Single fetch attempt from WebSDR.
192 Raises:
193 Exception: If fetch fails
194 """
195 url = f"{str(websdr_cfg.url).rstrip('/')}/iq"
196 params = {
197 "tune": frequency_hz,
198 "rate": sample_rate_hz,
199 }
201 async with self.session.get(url, params=params) as response:
202 if response.status != 200:
203 raise RuntimeError(
204 f"HTTP {response.status} from {websdr_cfg.name}"
205 )
207 # Read binary IQ data
208 data = await response.read()
210 # WebSDR returns interleaved int16 I and Q samples
211 # Each sample is 4 bytes (2 bytes I, 2 bytes Q)
212 iq_samples = struct.unpack(f'<{len(data)//4}h', data[:len(data)//4*4])
214 # Reshape to pairs and convert to complex
215 iq_array = np.array(iq_samples, dtype=np.int16)
217 # Normalize to [-1, 1] range and convert to complex
218 iq_normalized = iq_array.astype(np.float32) / 32767.0
219 iq_complex = iq_normalized[0::2] + 1j * iq_normalized[1::2]
221 if len(iq_complex) == 0:
222 raise RuntimeError(f"No IQ data received from {websdr_cfg.name}")
224 logger.debug(
225 "Received %d samples from %s",
226 len(iq_complex),
227 websdr_cfg.name
228 )
230 return iq_complex.astype(np.complex64)
232 async def health_check(self) -> Dict[int, bool]:
233 """
234 Check connectivity to all WebSDR receivers.
236 Returns:
237 Dict mapping WebSDR ID to True if reachable, False otherwise
238 """
239 if not self.session:
240 raise RuntimeError("Fetcher not in context manager")
242 async def check_single(websdr_id: int, websdr_cfg: WebSDRConfig) -> Tuple[int, bool]:
243 """
244 Check if a single WebSDR is online.
246 Try HEAD first (fast), fall back to GET if HEAD fails with 501.
247 A WebSDR is considered ONLINE if:
248 - HEAD request returns 200-299 (success)
249 - GET request returns 200-299 (success)
250 - GET request returns 301-399 (redirect - still responding)
251 - HEAD returns 501 (method not supported - server is responding!)
253 A WebSDR is OFFLINE if:
254 - Connection timeout
255 - DNS resolution fails
256 - Server returns 4xx/5xx (not 501)
257 """
258 try:
259 # Use SHORT timeout for health checks - 3 seconds max per receiver
260 timeout = aiohttp.ClientTimeout(total=3)
262 # First try HEAD (faster)
263 logger.debug(f"Health check: HEAD {websdr_cfg.url} (SDR #{websdr_id}, timeout=3s)")
264 async with self.session.head(
265 str(websdr_cfg.url),
266 timeout=timeout,
267 allow_redirects=False
268 ) as response:
269 # 501 = "Method Not Supported" - server is alive but doesn't support HEAD
270 if response.status == 501:
271 logger.debug(f" → 501 (HEAD not supported), trying GET... (SDR #{websdr_id})")
272 # Fall through to try GET
273 # 200-299 = Success
274 elif 200 <= response.status < 300:
275 logger.info(f" → {response.status} OK (SDR #{websdr_id} ONLINE)")
276 return websdr_id, True
277 # 300-399 = Redirect (server is responding)
278 elif 300 <= response.status < 400:
279 logger.info(f" → {response.status} Redirect (SDR #{websdr_id} ONLINE)")
280 return websdr_id, True
281 # 4xx/5xx (except 501) = Error
282 else:
283 logger.warning(f" → {response.status} Error (SDR #{websdr_id} appears OFFLINE)")
284 return websdr_id, False
286 # If we got here, we got 501, try GET
287 logger.debug(f"Health check: GET {websdr_cfg.url} (SDR #{websdr_id}, timeout=3s)")
288 async with self.session.get(
289 str(websdr_cfg.url),
290 timeout=timeout,
291 allow_redirects=False
292 ) as response:
293 # Accept 200-399 range as "online"
294 is_online = 200 <= response.status < 400
295 status_msg = "ONLINE" if is_online else "OFFLINE"
296 logger.info(f" → GET {response.status} (SDR #{websdr_id} {status_msg})")
297 return websdr_id, is_online
299 except asyncio.TimeoutError:
300 logger.warning(f"Health check timeout for {websdr_cfg.url} (SDR #{websdr_id}) - marking OFFLINE")
301 return websdr_id, False
302 except Exception as e:
303 logger.warning(f"Health check error for {websdr_cfg.url} (SDR #{websdr_id}): {type(e).__name__}: {e} - marking OFFLINE")
304 return websdr_id, False
306 tasks = [
307 check_single(wsid, wscfg)
308 for wsid, wscfg in self.websdrs.items()
309 ]
311 results = await asyncio.gather(*tasks, return_exceptions=False)
312 return dict(results)