Coverage for services/rf-acquisition/src/tasks/uptime_monitor.py: 18%
94 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
1"""Monitor WebSDR uptime and record to database."""
3import logging
4from datetime import datetime
5from typing import List, Dict
6import asyncio
8from celery import shared_task
9from sqlalchemy import insert
11try:
12 from ..config import settings
13 from ..storage.db_manager import DatabaseManager
14 from ..fetchers.websdr_fetcher import WebSDRFetcher
15 from ..models.db import Base
16except ImportError:
17 # Fallback for direct execution
18 from config import settings
19 from storage.db_manager import DatabaseManager
20 from fetchers.websdr_fetcher import WebSDRFetcher
22logger = logging.getLogger(__name__)
25# Define websdrs_uptime_history table model (raw SQL approach)
26UPTIME_HISTORY_TABLE = """
27CREATE TABLE IF NOT EXISTS heimdall.websdrs_uptime_history (
28 id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
29 websdr_id INT NOT NULL,
30 websdr_name VARCHAR(255),
31 status VARCHAR(20) NOT NULL CHECK (status IN ('online', 'offline')),
32 timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
33 created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
34);
36SELECT create_hypertable(
37 'heimdall.websdrs_uptime_history',
38 'timestamp',
39 if_not_exists => TRUE
40);
42CREATE INDEX IF NOT EXISTS idx_websdrs_uptime_history_websdr_time
43ON heimdall.websdrs_uptime_history(websdr_id, timestamp DESC);
44"""
47def get_default_websdrs() -> List[Dict]:
48 """Get default WebSDR configuration."""
49 return [
50 {
51 "id": 1,
52 "name": "Aquila di Giaveno",
53 "url": "http://sdr1.ik1jns.it:8076/",
54 "location_name": "Giaveno, Italy",
55 "latitude": 45.02,
56 "longitude": 7.29,
57 "is_active": True,
58 "timeout_seconds": 3,
59 "retry_count": 1
60 },
61 {
62 "id": 2,
63 "name": "Montanaro",
64 "url": "http://cbfenis.ddns.net:43510/",
65 "location_name": "Montanaro, Italy",
66 "latitude": 45.234,
67 "longitude": 7.857,
68 "is_active": True,
69 "timeout_seconds": 3,
70 "retry_count": 1
71 },
72 {
73 "id": 3,
74 "name": "Torino",
75 "url": "http://vst-aero.it:8073/",
76 "location_name": "Torino, Italy",
77 "latitude": 45.044,
78 "longitude": 7.672,
79 "is_active": True,
80 "timeout_seconds": 3,
81 "retry_count": 1
82 },
83 {
84 "id": 4,
85 "name": "Coazze",
86 "url": "http://94.247.189.130:8076/",
87 "location_name": "Coazze, Italy",
88 "latitude": 45.03,
89 "longitude": 7.27,
90 "is_active": True,
91 "timeout_seconds": 3,
92 "retry_count": 1
93 },
94 {
95 "id": 5,
96 "name": "Passo del Giovi",
97 "url": "http://iz1mlt.ddns.net:8074/",
98 "location_name": "Passo del Giovi, Italy",
99 "latitude": 44.561,
100 "longitude": 8.956,
101 "is_active": True,
102 "timeout_seconds": 3,
103 "retry_count": 1
104 },
105 {
106 "id": 6,
107 "name": "Genova",
108 "url": "http://iq1zw.ddns.net:42154/",
109 "location_name": "Genova, Italy",
110 "latitude": 44.395,
111 "longitude": 8.956,
112 "is_active": True,
113 "timeout_seconds": 3,
114 "retry_count": 1
115 },
116 {
117 "id": 7,
118 "name": "Milano - Baggio",
119 "url": "http://iu2mch.duckdns.org:8073/",
120 "location_name": "Milano (Baggio), Italy",
121 "latitude": 45.478,
122 "longitude": 9.123,
123 "is_active": True,
124 "timeout_seconds": 3,
125 "retry_count": 1
126 },
127 ]
130@shared_task(bind=True, name='monitor_websdrs_uptime')
131def monitor_websdrs_uptime(self):
132 """
133 Celery task: Check WebSDR health and record status to database.
134 Runs periodically (every minute) to build uptime history.
135 """
136 try:
137 logger.info("Starting WebSDR uptime monitoring task")
139 # Get WebSDR configs as dict
140 websdrs = get_default_websdrs()
142 # Direct health check without WebSDRFetcher (to avoid object attribute issues)
143 logger.info(f"Checking health of {len(websdrs)} WebSDRs")
144 import aiohttp
146 async def direct_health_check():
147 """Direct health check without using WebSDRFetcher."""
148 results = {}
149 timeout = aiohttp.ClientTimeout(total=3)
151 async with aiohttp.ClientSession(timeout=timeout) as session:
152 for ws in websdrs:
153 ws_id = ws['id']
154 ws_url = ws['url']
155 ws_name = ws['name']
157 try:
158 async with session.head(ws_url, timeout=timeout, allow_redirects=False) as resp:
159 # 501 means HEAD not supported, try GET
160 if resp.status == 501:
161 async with session.get(ws_url, timeout=timeout, allow_redirects=False) as resp2:
162 is_online = 200 <= resp2.status < 400
163 logger.debug(f" {ws_name} ({ws_id}): GET {resp2.status} → {'online' if is_online else 'offline'}")
164 results[ws_id] = is_online
165 else:
166 is_online = 200 <= resp.status < 400
167 logger.debug(f" {ws_name} ({ws_id}): HEAD {resp.status} → {'online' if is_online else 'offline'}")
168 results[ws_id] = is_online
169 except Exception as e:
170 logger.warning(f" {ws_name} ({ws_id}): Exception {type(e).__name__} → offline")
171 results[ws_id] = False
173 return results
175 # Run async health check
176 loop = asyncio.get_event_loop()
177 health_results = loop.run_until_complete(direct_health_check())
179 logger.info(f"Health check results: {health_results}")
181 # Get database connection
182 db_manager = DatabaseManager()
184 # Record status for each WebSDR
185 records_inserted = 0
186 with db_manager.get_session() as session:
187 for ws_config in websdrs:
188 ws_id = ws_config['id']
189 ws_name = ws_config['name']
191 # Get online/offline status
192 is_online = health_results.get(ws_id, False)
193 status = 'online' if is_online else 'offline'
194 timestamp = datetime.utcnow()
196 # Insert into websdrs_uptime_history
197 try:
198 from sqlalchemy import text
200 # Raw SQL insert for uptime history using text()
201 query = text("""
202 INSERT INTO heimdall.websdrs_uptime_history
203 (websdr_id, websdr_name, status, timestamp)
204 VALUES (:websdr_id, :websdr_name, :status, :timestamp)
205 """)
206 session.execute(
207 query,
208 {
209 'websdr_id': ws_id,
210 'websdr_name': ws_name,
211 'status': status,
212 'timestamp': timestamp
213 }
214 )
215 records_inserted += 1
216 logger.debug(f"Recorded {ws_name} ({ws_id}): {status}")
217 except Exception as e:
218 logger.error(f"Failed to record uptime for {ws_name}: {e}")
220 session.commit()
222 logger.info(f"Uptime monitoring complete: {records_inserted} records inserted")
224 return {
225 'status': 'success',
226 'records_inserted': records_inserted,
227 'timestamp': datetime.utcnow().isoformat()
228 }
230 except Exception as e:
231 logger.exception(f"Error in uptime monitoring: {e}")
232 raise
235def calculate_uptime_percentage(websdr_id: int, hours: int = 24) -> float:
236 """
237 Calculate uptime percentage for a WebSDR over the last N hours.
239 Args:
240 websdr_id: ID of the WebSDR
241 hours: Time window in hours (default: 24)
243 Returns:
244 Uptime percentage (0-100)
245 """
246 try:
247 from datetime import timedelta
248 from sqlalchemy import text
250 db_manager = DatabaseManager()
252 with db_manager.get_session() as session:
253 # Query uptime history for the time window
254 cutoff_time = datetime.utcnow() - timedelta(hours=hours)
256 # Raw SQL query using SQLAlchemy text()
257 query = text("""
258 SELECT status, COUNT(*) as count
259 FROM heimdall.websdrs_uptime_history
260 WHERE websdr_id = :websdr_id
261 AND timestamp >= :cutoff_time
262 GROUP BY status
263 """)
265 results = session.execute(
266 query,
267 {'websdr_id': websdr_id, 'cutoff_time': cutoff_time}
268 )
270 status_counts = {row[0]: row[1] for row in results}
272 total_checks = sum(status_counts.values())
273 if total_checks == 0:
274 return 0.0
276 online_checks = status_counts.get('online', 0)
277 uptime_pct = (online_checks / total_checks) * 100
279 logger.debug(
280 f"SDR {websdr_id}: {online_checks}/{total_checks} online over {hours}h = {uptime_pct:.1f}%"
281 )
283 return uptime_pct
285 except Exception as e:
286 logger.error(f"Error calculating uptime for SDR {websdr_id}: {e}")
287 return 0.0