Coverage for services/rf-acquisition/src/storage/minio_client.py: 72%

109 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-25 16:18 +0000

1"""MinIO S3 storage client for IQ data.""" 

2 

3import logging 

4import json 

5from typing import Dict, Optional, Tuple 

6from io import BytesIO 

7import numpy as np 

8import boto3 

9from botocore.exceptions import ClientError, NoCredentialsError 

10 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15class MinIOClient: 

16 """Client for storing IQ data and metadata in MinIO S3.""" 

17 

18 def __init__( 

19 self, 

20 endpoint_url: str, 

21 access_key: str, 

22 secret_key: str, 

23 bucket_name: str = "heimdall-raw-iq", 

24 region_name: str = "us-east-1", 

25 ): 

26 """Initialize MinIO S3 client.""" 

27 self.endpoint_url = endpoint_url 

28 self.bucket_name = bucket_name 

29 self.region_name = region_name 

30 

31 self.s3_client = boto3.client( 

32 's3', 

33 endpoint_url=endpoint_url, 

34 aws_access_key_id=access_key, 

35 aws_secret_access_key=secret_key, 

36 region_name=region_name, 

37 ) 

38 

39 logger.info( 

40 "MinIO client initialized - endpoint: %s, bucket: %s", 

41 endpoint_url, 

42 bucket_name 

43 ) 

44 

45 def ensure_bucket_exists(self) -> bool: 

46 """Ensure the bucket exists, create if not.""" 

47 try: 

48 self.s3_client.head_bucket(Bucket=self.bucket_name) 

49 logger.debug("Bucket %s already exists", self.bucket_name) 

50 return True 

51 except ClientError as e: 

52 error_code = int(e.response['Error']['Code']) 

53 if error_code == 404: 

54 try: 

55 self.s3_client.create_bucket(Bucket=self.bucket_name) 

56 logger.info("Created bucket: %s", self.bucket_name) 

57 return True 

58 except ClientError as create_error: 

59 logger.error( 

60 "Failed to create bucket %s: %s", 

61 self.bucket_name, 

62 create_error 

63 ) 

64 return False 

65 else: 

66 logger.error( 

67 "Error checking bucket %s: %s", 

68 self.bucket_name, 

69 e 

70 ) 

71 return False 

72 except NoCredentialsError: 

73 logger.error("MinIO credentials not found") 

74 return False 

75 

76 def upload_iq_data( 

77 self, 

78 iq_data: np.ndarray, 

79 task_id: str, 

80 websdr_id: int, 

81 metadata: Optional[Dict] = None, 

82 ) -> Tuple[bool, str]: 

83 """Upload IQ data as .npy file to MinIO.""" 

84 try: 

85 if not self.ensure_bucket_exists(): 

86 return False, f"Failed to access bucket {self.bucket_name}" 

87 

88 s3_path = f"sessions/{task_id}/websdr_{websdr_id}.npy" 

89 

90 buffer = BytesIO() 

91 np.save(buffer, iq_data) 

92 buffer.seek(0) 

93 iq_bytes = buffer.getvalue() 

94 

95 self.s3_client.put_object( 

96 Bucket=self.bucket_name, 

97 Key=s3_path, 

98 Body=iq_bytes, 

99 ContentType='application/octet-stream', 

100 Metadata={ 

101 'websdr_id': str(websdr_id), 

102 'task_id': task_id, 

103 'data_type': 'iq_complex64', 

104 'samples_count': str(len(iq_data)), 

105 } 

106 ) 

107 

108 logger.info( 

109 "Uploaded IQ data to s3://%s/%s (%d samples, %.2f MB)", 

110 self.bucket_name, 

111 s3_path, 

112 len(iq_data), 

113 iq_bytes.__sizeof__() / (1024 * 1024) 

114 ) 

115 

116 if metadata: 

117 metadata_path = f"sessions/{task_id}/websdr_{websdr_id}_metadata.json" 

118 metadata_json = json.dumps(metadata, indent=2) 

119 

120 self.s3_client.put_object( 

121 Bucket=self.bucket_name, 

122 Key=metadata_path, 

123 Body=metadata_json.encode('utf-8'), 

124 ContentType='application/json', 

125 Metadata={ 

126 'websdr_id': str(websdr_id), 

127 'task_id': task_id, 

128 'data_type': 'metadata', 

129 } 

130 ) 

131 

132 logger.info( 

133 "Uploaded metadata to s3://%s/%s", 

134 self.bucket_name, 

135 metadata_path 

136 ) 

137 

138 return True, f"s3://{self.bucket_name}/{s3_path}" 

139 

140 except ClientError as e: 

141 error_msg = f"Failed to upload IQ data: {e}" 

142 logger.error(error_msg) 

143 return False, error_msg 

144 except Exception as e: 

145 error_msg = f"Unexpected error uploading IQ data: {str(e)}" 

146 logger.exception(error_msg) 

147 return False, error_msg 

148 

149 def download_iq_data( 

150 self, 

151 task_id: str, 

152 websdr_id: int, 

153 ) -> Tuple[bool, Optional[np.ndarray]]: 

154 """Download IQ data from MinIO.""" 

155 try: 

156 s3_path = f"sessions/{task_id}/websdr_{websdr_id}.npy" 

157 

158 response = self.s3_client.get_object( 

159 Bucket=self.bucket_name, 

160 Key=s3_path 

161 ) 

162 

163 buffer = BytesIO(response['Body'].read()) 

164 iq_data = np.load(buffer) 

165 

166 logger.info( 

167 "Downloaded IQ data from s3://%s/%s (%d samples)", 

168 self.bucket_name, 

169 s3_path, 

170 len(iq_data) 

171 ) 

172 

173 return True, iq_data 

174 

175 except ClientError as e: 

176 logger.error("Failed to download IQ data: %s", e) 

177 return False, None 

178 except Exception as e: 

179 logger.exception("Unexpected error downloading IQ data: %s", e) 

180 return False, None 

181 

182 def get_session_measurements(self, task_id: str) -> Dict[int, Dict]: 

183 """List all measurements from a session.""" 

184 measurements = {} 

185 

186 try: 

187 prefix = f"sessions/{task_id}/" 

188 

189 paginator = self.s3_client.get_paginator('list_objects_v2') 

190 pages = paginator.paginate( 

191 Bucket=self.bucket_name, 

192 Prefix=prefix 

193 ) 

194 

195 for page in pages: 

196 if 'Contents' not in page: 

197 continue 

198 

199 for obj in page['Contents']: 

200 key = obj['Key'] 

201 

202 if '.npy' in key and '_metadata' not in key: 

203 parts = key.split('/') 

204 if parts[-1].startswith('websdr_'): 

205 websdr_id = int( 

206 parts[-1].replace('websdr_', '').replace('.npy', '') 

207 ) 

208 measurements[websdr_id] = { 

209 'iq_data_path': f"s3://{self.bucket_name}/{key}", 

210 'size_bytes': obj['Size'], 

211 'last_modified': obj['LastModified'].isoformat(), 

212 } 

213 

214 logger.info( 

215 "Found %d measurements in session %s", 

216 len(measurements), 

217 task_id 

218 ) 

219 

220 return measurements 

221 

222 except ClientError as e: 

223 logger.error("Failed to list session measurements: %s", e) 

224 return {} 

225 except Exception as e: 

226 logger.exception("Unexpected error listing measurements: %s", e) 

227 return {} 

228 

229 def health_check(self) -> Dict[str, bool]: 

230 """Check MinIO connectivity and bucket access.""" 

231 try: 

232 self.s3_client.head_bucket(Bucket=self.bucket_name) 

233 

234 return { 

235 'status': 'healthy', 

236 'endpoint': self.endpoint_url, 

237 'bucket': self.bucket_name, 

238 'accessible': True, 

239 } 

240 except Exception as e: 

241 logger.error("MinIO health check failed: %s", e) 

242 return { 

243 'status': 'unhealthy', 

244 'endpoint': self.endpoint_url, 

245 'bucket': self.bucket_name, 

246 'accessible': False, 

247 'error': str(e), 

248 } 

249import numpy as np 

250import boto3 

251from botocore.exceptions import ClientError, NoCredentialsError 

252 

253 

254logger = logging.getLogger(__name__)