Coverage for services/data-ingestion-web/src/db.py: 64%

28 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-25 16:18 +0000

1""" 

2Database connection and utilities 

3""" 

4import asyncpg 

5from typing import Optional 

6import os 

7import logging 

8 

9logger = logging.getLogger(__name__) 

10 

11# Database configuration from environment 

12DB_HOST = os.getenv("POSTGRES_HOST", "postgres") 

13DB_PORT = int(os.getenv("POSTGRES_PORT", "5432")) 

14DB_NAME = os.getenv("POSTGRES_DB", "heimdall") 

15DB_USER = os.getenv("POSTGRES_USER", "heimdall_user") 

16DB_PASSWORD = os.getenv("POSTGRES_PASSWORD", "changeme") 

17 

18# Global connection pool 

19_pool: Optional[asyncpg.Pool] = None 

20 

21 

22async def get_pool() -> asyncpg.Pool: 

23 """Get or create database connection pool""" 

24 global _pool 

25 

26 if _pool is None: 

27 logger.info(f"Creating database connection pool to {DB_HOST}:{DB_PORT}/{DB_NAME}") 

28 _pool = await asyncpg.create_pool( 

29 host=DB_HOST, 

30 port=DB_PORT, 

31 database=DB_NAME, 

32 user=DB_USER, 

33 password=DB_PASSWORD, 

34 min_size=2, 

35 max_size=10, 

36 command_timeout=30, 

37 ) 

38 logger.info("Database connection pool created successfully") 

39 

40 return _pool 

41 

42 

43async def close_pool(): 

44 """Close database connection pool""" 

45 global _pool 

46 

47 if _pool is not None: 

48 logger.info("Closing database connection pool") 

49 await _pool.close() 

50 _pool = None 

51 

52 

53async def get_connection(): 

54 """Get a database connection from the pool""" 

55 pool = await get_pool() 

56 return await pool.acquire() 

57 

58 

59async def release_connection(conn): 

60 """Release a database connection back to the pool""" 

61 pool = await get_pool() 

62 await pool.release(conn)