Coverage for services/rf-acquisition/src/storage/minio_client.py: 72%
109 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
1"""MinIO S3 storage client for IQ data."""
3import logging
4import json
5from typing import Dict, Optional, Tuple
6from io import BytesIO
7import numpy as np
8import boto3
9from botocore.exceptions import ClientError, NoCredentialsError
12logger = logging.getLogger(__name__)
15class MinIOClient:
16 """Client for storing IQ data and metadata in MinIO S3."""
18 def __init__(
19 self,
20 endpoint_url: str,
21 access_key: str,
22 secret_key: str,
23 bucket_name: str = "heimdall-raw-iq",
24 region_name: str = "us-east-1",
25 ):
26 """Initialize MinIO S3 client."""
27 self.endpoint_url = endpoint_url
28 self.bucket_name = bucket_name
29 self.region_name = region_name
31 self.s3_client = boto3.client(
32 's3',
33 endpoint_url=endpoint_url,
34 aws_access_key_id=access_key,
35 aws_secret_access_key=secret_key,
36 region_name=region_name,
37 )
39 logger.info(
40 "MinIO client initialized - endpoint: %s, bucket: %s",
41 endpoint_url,
42 bucket_name
43 )
45 def ensure_bucket_exists(self) -> bool:
46 """Ensure the bucket exists, create if not."""
47 try:
48 self.s3_client.head_bucket(Bucket=self.bucket_name)
49 logger.debug("Bucket %s already exists", self.bucket_name)
50 return True
51 except ClientError as e:
52 error_code = int(e.response['Error']['Code'])
53 if error_code == 404:
54 try:
55 self.s3_client.create_bucket(Bucket=self.bucket_name)
56 logger.info("Created bucket: %s", self.bucket_name)
57 return True
58 except ClientError as create_error:
59 logger.error(
60 "Failed to create bucket %s: %s",
61 self.bucket_name,
62 create_error
63 )
64 return False
65 else:
66 logger.error(
67 "Error checking bucket %s: %s",
68 self.bucket_name,
69 e
70 )
71 return False
72 except NoCredentialsError:
73 logger.error("MinIO credentials not found")
74 return False
76 def upload_iq_data(
77 self,
78 iq_data: np.ndarray,
79 task_id: str,
80 websdr_id: int,
81 metadata: Optional[Dict] = None,
82 ) -> Tuple[bool, str]:
83 """Upload IQ data as .npy file to MinIO."""
84 try:
85 if not self.ensure_bucket_exists():
86 return False, f"Failed to access bucket {self.bucket_name}"
88 s3_path = f"sessions/{task_id}/websdr_{websdr_id}.npy"
90 buffer = BytesIO()
91 np.save(buffer, iq_data)
92 buffer.seek(0)
93 iq_bytes = buffer.getvalue()
95 self.s3_client.put_object(
96 Bucket=self.bucket_name,
97 Key=s3_path,
98 Body=iq_bytes,
99 ContentType='application/octet-stream',
100 Metadata={
101 'websdr_id': str(websdr_id),
102 'task_id': task_id,
103 'data_type': 'iq_complex64',
104 'samples_count': str(len(iq_data)),
105 }
106 )
108 logger.info(
109 "Uploaded IQ data to s3://%s/%s (%d samples, %.2f MB)",
110 self.bucket_name,
111 s3_path,
112 len(iq_data),
113 iq_bytes.__sizeof__() / (1024 * 1024)
114 )
116 if metadata:
117 metadata_path = f"sessions/{task_id}/websdr_{websdr_id}_metadata.json"
118 metadata_json = json.dumps(metadata, indent=2)
120 self.s3_client.put_object(
121 Bucket=self.bucket_name,
122 Key=metadata_path,
123 Body=metadata_json.encode('utf-8'),
124 ContentType='application/json',
125 Metadata={
126 'websdr_id': str(websdr_id),
127 'task_id': task_id,
128 'data_type': 'metadata',
129 }
130 )
132 logger.info(
133 "Uploaded metadata to s3://%s/%s",
134 self.bucket_name,
135 metadata_path
136 )
138 return True, f"s3://{self.bucket_name}/{s3_path}"
140 except ClientError as e:
141 error_msg = f"Failed to upload IQ data: {e}"
142 logger.error(error_msg)
143 return False, error_msg
144 except Exception as e:
145 error_msg = f"Unexpected error uploading IQ data: {str(e)}"
146 logger.exception(error_msg)
147 return False, error_msg
149 def download_iq_data(
150 self,
151 task_id: str,
152 websdr_id: int,
153 ) -> Tuple[bool, Optional[np.ndarray]]:
154 """Download IQ data from MinIO."""
155 try:
156 s3_path = f"sessions/{task_id}/websdr_{websdr_id}.npy"
158 response = self.s3_client.get_object(
159 Bucket=self.bucket_name,
160 Key=s3_path
161 )
163 buffer = BytesIO(response['Body'].read())
164 iq_data = np.load(buffer)
166 logger.info(
167 "Downloaded IQ data from s3://%s/%s (%d samples)",
168 self.bucket_name,
169 s3_path,
170 len(iq_data)
171 )
173 return True, iq_data
175 except ClientError as e:
176 logger.error("Failed to download IQ data: %s", e)
177 return False, None
178 except Exception as e:
179 logger.exception("Unexpected error downloading IQ data: %s", e)
180 return False, None
182 def get_session_measurements(self, task_id: str) -> Dict[int, Dict]:
183 """List all measurements from a session."""
184 measurements = {}
186 try:
187 prefix = f"sessions/{task_id}/"
189 paginator = self.s3_client.get_paginator('list_objects_v2')
190 pages = paginator.paginate(
191 Bucket=self.bucket_name,
192 Prefix=prefix
193 )
195 for page in pages:
196 if 'Contents' not in page:
197 continue
199 for obj in page['Contents']:
200 key = obj['Key']
202 if '.npy' in key and '_metadata' not in key:
203 parts = key.split('/')
204 if parts[-1].startswith('websdr_'):
205 websdr_id = int(
206 parts[-1].replace('websdr_', '').replace('.npy', '')
207 )
208 measurements[websdr_id] = {
209 'iq_data_path': f"s3://{self.bucket_name}/{key}",
210 'size_bytes': obj['Size'],
211 'last_modified': obj['LastModified'].isoformat(),
212 }
214 logger.info(
215 "Found %d measurements in session %s",
216 len(measurements),
217 task_id
218 )
220 return measurements
222 except ClientError as e:
223 logger.error("Failed to list session measurements: %s", e)
224 return {}
225 except Exception as e:
226 logger.exception("Unexpected error listing measurements: %s", e)
227 return {}
229 def health_check(self) -> Dict[str, bool]:
230 """Check MinIO connectivity and bucket access."""
231 try:
232 self.s3_client.head_bucket(Bucket=self.bucket_name)
234 return {
235 'status': 'healthy',
236 'endpoint': self.endpoint_url,
237 'bucket': self.bucket_name,
238 'accessible': True,
239 }
240 except Exception as e:
241 logger.error("MinIO health check failed: %s", e)
242 return {
243 'status': 'unhealthy',
244 'endpoint': self.endpoint_url,
245 'bucket': self.bucket_name,
246 'accessible': False,
247 'error': str(e),
248 }
249import numpy as np
250import boto3
251from botocore.exceptions import ClientError, NoCredentialsError
254logger = logging.getLogger(__name__)