Coverage for services/data-ingestion-web/src/db.py: 64%
28 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-25 16:18 +0000
1"""
2Database connection and utilities
3"""
4import asyncpg
5from typing import Optional
6import os
7import logging
9logger = logging.getLogger(__name__)
11# Database configuration from environment
12DB_HOST = os.getenv("POSTGRES_HOST", "postgres")
13DB_PORT = int(os.getenv("POSTGRES_PORT", "5432"))
14DB_NAME = os.getenv("POSTGRES_DB", "heimdall")
15DB_USER = os.getenv("POSTGRES_USER", "heimdall_user")
16DB_PASSWORD = os.getenv("POSTGRES_PASSWORD", "changeme")
18# Global connection pool
19_pool: Optional[asyncpg.Pool] = None
22async def get_pool() -> asyncpg.Pool:
23 """Get or create database connection pool"""
24 global _pool
26 if _pool is None:
27 logger.info(f"Creating database connection pool to {DB_HOST}:{DB_PORT}/{DB_NAME}")
28 _pool = await asyncpg.create_pool(
29 host=DB_HOST,
30 port=DB_PORT,
31 database=DB_NAME,
32 user=DB_USER,
33 password=DB_PASSWORD,
34 min_size=2,
35 max_size=10,
36 command_timeout=30,
37 )
38 logger.info("Database connection pool created successfully")
40 return _pool
43async def close_pool():
44 """Close database connection pool"""
45 global _pool
47 if _pool is not None:
48 logger.info("Closing database connection pool")
49 await _pool.close()
50 _pool = None
53async def get_connection():
54 """Get a database connection from the pool"""
55 pool = await get_pool()
56 return await pool.acquire()
59async def release_connection(conn):
60 """Release a database connection back to the pool"""
61 pool = await get_pool()
62 await pool.release(conn)