
NOTICE: All information contained herein is, and remains
the property of TechnoCore.
The intellectual and technical concepts contained
herein are proprietary to TechnoCore and dissemination of this information or reproduction of this material
is strictly forbidden unless prior written permission is obtained
from TechnoCore.
ObjConnectionPoolObjConnectionPool is a production-ready, thread-safe database connection pooling implementation designed to reduce connection overhead and prevent connection exhaustion in multi-threaded environments within the Axion framework.
Current Version: 8.2
Connection pooling is essential for applications that need to maintain multiple database connections efficiently. ObjConnectionPool provides a robust solution that manages connection lifecycle, validates connection health, and automatically handles connection recycling.
pymysql (Recommended)
mysql-connector-python
All operations are thread-safe through:
from ObjConnectionPool import ConnectionPool
# Create a connection pool
pool = ConnectionPool(size=10, config_name="primary")
# Use with context manager (recommended)
with pool as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
results = cursor.fetchall()
cursor.close()
# Connection automatically returned to pool
# Create pool with pymysql driver
pool = ConnectionPool(size=10, driver="pymysql")
# Manually acquire connection
conn = pool.acquire()
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
results = cursor.fetchall()
cursor.close()
finally:
# Always release in finally block
pool.release(conn)
# Use mysql-connector driver
pool = ConnectionPool(size=10, driver="mysql-connector")
with pool as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM products")
count = cursor.fetchone()[0]
cursor.close()
from ObjConnectionPool import get_global_pool
# Create/get global pool instance
pool = get_global_pool(size=20, driver="pymysql")
# Subsequent calls return the same pool
same_pool = get_global_pool() # Returns existing pool
# Production-ready configuration
pool = ConnectionPool(
size=20, # Base pool size
config_name="primary", # Database config name
timeout=30.0, # Acquisition timeout
max_overflow=10, # Allow 10 extra connections
recycle=1800, # Recycle connections after 30 minutes
driver="pymysql" # Use pymysql driver
)
| Application Size | Base Size | Max Overflow |
|---|---|---|
| Small | 5-10 | 5 |
| Medium | 10-20 | 10 |
| Large | 20-50 | 20 |
| Traffic Pattern | Recycle Time |
|---|---|
| High-traffic | 1800s (30 min) |
| Default | 3600s (1 hour) |
| Low-traffic | 7200s (2 hours) |
Use pymysql when:
Use mysql-connector when:
ConnectionPoolThread-safe connection pool for database connections.
ConnectionPool(
size: int = 10,
config_name: str = "primary",
timeout: float = 30.0,
max_overflow: int = 5,
recycle: int = 3600,
driver: str = "pymysql"
)
Parameters:
size: Number of connections to maintain in the poolconfig_name: Database configuration name from config.yamltimeout: Max seconds to wait for connection (default: 30)max_overflow: Max connections beyond pool size (default: 5)recycle: Seconds before recycling connection (default: 3600)driver: MySQL driver to use ("pymysql" or "mysql-connector")acquire(timeout=None, max_retries=3)Acquire a connection from the pool with validation and retry logic.
Parameters:
timeout: Max seconds to wait (None = use pool default)max_retries: Maximum number of retries for corrupted connectionsReturns: Database connection object
Raises: Empty if no connection available within timeout
release(conn)Return a connection to the pool.
Parameters:
conn: Connection to returnIf the pool is full, overflow connections are closed instead of being returned.
close_all()Close all connections in the pool.
Warning: This will close connections even if in use!
get_stats()Get pool statistics.
Returns: Dictionary with pool statistics:
created: Number of connections createdrecycled: Number of connections recycledacquired: Number of connection acquisitionsreleased: Number of connection releasestimeouts: Number of acquisition timeoutstotal_connections: Current total connections (in pool + in use)pool_size: Current number of connections in poolmax_size: Maximum pool size (size + max_overflow)get_global_poolget_global_pool(
size: int = 10,
config_name: str = "primary",
timeout: float = 30.0,
max_overflow: int = 5,
recycle: int = 3600,
driver: str = "pymysql"
) -> ConnectionPool
Get or create the global connection pool.
Note: Parameters are only used when creating a new pool. Subsequent calls return the existing pool regardless of parameters provided.
Symptom: RuntimeError: Connection pool exhausted
Solutions:
Example:
# Increase pool capacity
pool = ConnectionPool(size=20, max_overflow=15)
Symptom: Empty: Connection pool timeout after Xs
Solutions:
Example:
# Increase timeout
pool = ConnectionPool(size=10, timeout=60.0)
Symptom: Intermittent validation failures
Causes:
Behavior:
The pool will automatically retry and replace bad connections. This is normal behavior for long-running applications.
Prevention:
Always release connections using try/finally or context managers:
# Good - using context manager
with pool as conn:
# Use connection
pass
# Good - using try/finally
conn = pool.acquire()
try:
# Use connection
pass
finally:
pool.release(conn)
# Bad - connection may leak on exception
conn = pool.acquire()
# Use connection
pool.release(conn) # May not execute if exception occurs
The connection pool is integrated with ObjData to provide efficient connection management for remote database operations. ObjData uses thread-local storage to cache remote connections, which are backed by the connection pool.
from ObjData import ObjData
# ObjData automatically uses connection pool for remote connections
data = ObjData()
# Remote connection is pooled
remote_conn = data.db_connect_remote("secondary")
# Use connection
cursor = remote_conn.cursor()
cursor.execute("SELECT * FROM remote_table")
results = cursor.fetchall()
cursor.close()
The test cases for this object are located in resource.test/pytests/factory.core/test_ObjConnectionPool.py.
The comprehensive test suite validates all aspects of the connection pool implementation across 22 test cases organized into 8 test classes:
All 22 tests PASSED ✅
test_release_connection - Validates connection release back to pooltest_acquire_release_multiple - Tests multiple acquire/release cyclestest_pool_size_limit - Verifies pool respects size limittest_overflow_connections - Tests overflow connection creationtest_overflow_limit_reached - Validates exception when overflow limit reachedtest_acquire_succeeds_after_release - Tests timeout and retry behaviortest_connection_recycling_disabled - Validates recycling disabled when recycle=0test_young_connection_not_recycled - Ensures young connections aren't recycledtest_concurrent_acquire_release - Validates thread safety with 10 concurrent workerstest_concurrent_context_manager - Tests concurrent context manager usagetest_no_connection_leaks - Verifies no connection leaks with 10 threadstest_threadpool_executor - Tests pool with ThreadPoolExecutor and 100 taskstest_stats_creation_tracking - Validates connection creation trackingtest_stats_acquire_release_tracking - Tests acquire/release statisticstest_stats_timeout_tracking - Verifies timeout trackingtest_stats_pool_size - Validates pool size statisticstest_release_none_connection - Tests graceful handling of None releasetest_close_all_connections - Validates closing all connectionstest_acquire_after_close_all - Tests pool recovery after close_alltest_get_global_pool_creates_once - Validates singleton patterntest_get_global_pool_thread_safe - Tests thread-safe global pool creationtest_rapid_acquire_release - Stress test with rapid acquire/release cyclesThe test suite covers:
# Run all tests
dev-env/bin/pytest resource.test/pytests/factory.core/test_ObjConnectionPool.py -v
# Run specific test class
dev-env/bin/pytest resource.test/pytests/factory.core/test_ObjConnectionPool.py::TestThreadSafety -v
# Run specific test
dev-env/bin/pytest resource.test/pytests/factory.core/test_ObjConnectionPool.py::TestThreadSafety::test_concurrent_acquire_release -v
# Best practice - context manager
with pool as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
cursor.close()
# Also good - try/finally
conn = pool.acquire()
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
cursor.close()
finally:
pool.release(conn)
# Periodically check pool health
stats = pool.get_stats()
print(f"Pool size: {stats['pool_size']}/{stats['max_size']}")
print(f"Timeouts: {stats['timeouts']}")
print(f"Total connections: {stats['total_connections']}")
# Base pool size on expected concurrent workers
workers = 15
pool = ConnectionPool(
size=workers, # Match worker count
max_overflow=workers//2 # 50% overflow capacity
)
# Application initialization
pool = get_global_pool(size=20, driver="pymysql")
# Throughout application
def some_function():
pool = get_global_pool()
with pool as conn:
# Use connection
pass
from queue import Empty
try:
conn = pool.acquire(timeout=10.0)
try:
# Use connection
pass
finally:
pool.release(conn)
except Empty:
# Handle timeout
print("Could not acquire connection - pool exhausted")
except Exception as e:
# Handle other errors
print(f"Database error: {e}")
pool = ConnectionPool(
size=30, # Larger base pool
max_overflow=20, # More overflow capacity
timeout=60.0, # Longer timeout
recycle=1800, # Recycle every 30 minutes
driver="pymysql" # Most stable driver
)
pool = ConnectionPool(
size=5, # Smaller base pool
max_overflow=3, # Limited overflow
timeout=15.0, # Shorter timeout
recycle=7200, # Recycle every 2 hours
driver="pymysql"
)
pool = ConnectionPool(
size=10,
max_overflow=5,
timeout=120.0, # Longer timeout for slow queries
recycle=3600, # Standard recycling
driver="pymysql"
)
ObjData.md - Main data management class that uses connection poolingObjDataSql.md - SQL-specific operationsObjects.md - Core framework objects