ObjDataThreadSafe is a thread-safe wrapper for ObjData that provides automatic per-thread database connection management. Each thread automatically gets its own database connection from a connection pool, preventing cursor conflicts and making ObjData safe for multi-threaded use without manual connection management.
Module: factory.core/ObjDataThreadSafe.py
Inherits from: ObjData.ObjData
Test file: None (integration tested through ObjConnectionPool tests)
Version: 8.0
This module enables developers to:
Each thread automatically receives its own database connection from a connection pool. No manual connection management required.
Built on top of ObjConnectionPool, providing:
Uses thread-local storage to ensure each thread's connection is isolated from others, preventing race conditions and cursor conflicts.
Automatically detects and recovers from stale database connections through connection pool reacquisition.
Provides two classes for different use cases:
ObjDataThreadSafe: Automatic thread-local connection managementObjDataPooled: Explicit connection acquisition with context managersThread-safe wrapper with automatic per-thread connections.
_thread_local: Threading.local() - Thread-local storage for connections_pool: Optional[ConnectionPool] - Class-level connection pool_pool_lock: threading.Lock() - Lock for pool initialization_instance_pool: ConnectionPool - Pool used by this instanceObjData with explicit connection pool context manager.
_pool: ConnectionPool - Connection pool reference_conn: DatabaseConnection - Acquired connectiondef __init__(
self,
pool: Optional[ConnectionPool] = None,
pool_size: int = 10,
config_name: str = "primary",
use_tui: bool = False,
)
Initialize thread-safe ObjData instance.
Parameters:
pool: Custom ConnectionPool instance (None = use global pool)pool_size: Pool size if creating new pool (default: 10)config_name: Database config name (default: "primary")use_tui: Enable terminal UI managerThread Safety: Each thread automatically gets its own connection from pool. Safe to share instance across threads.
Example:
# Simple usage - connection pool created automatically
obj = ObjDataThreadSafe()
result = obj.sql_get_values("SELECT * FROM users")
# With custom pool
pool = ConnectionPool(size=20)
obj = ObjDataThreadSafe(pool=pool)
# Multiple threads share same instance safely
def worker():
obj = ObjDataThreadSafe() # Each thread gets own connection
obj.sql_get_values("SELECT * FROM users")
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
def _get_thread_connection(self) -> DatabaseConnection
Get database connection for current thread. Creates new connection if this is first access from thread.
Returns: Thread-local database connection
Thread-safe: Uses thread-local storage
@classmethod
def close_thread_connection(cls):
Close connection for current thread and return to pool. Call this when thread is done with database operations to release connection back to pool immediately (otherwise released on thread exit).
Thread-safe: Operates on thread-local storage only
Example:
def worker_task():
obj = ObjDataThreadSafe()
try:
# Do database work
obj.sql_get_values("SELECT * FROM users")
finally:
# Explicitly release connection
ObjDataThreadSafe.close_thread_connection()
@classmethod
def get_pool_stats(cls) -> dict
Get statistics from the connection pool.
Returns: Dictionary with pool statistics
Thread-safe: Returns copy of stats
Example:
stats = ObjDataThreadSafe.get_pool_stats()
print(f"Active connections: {stats['active']}")
print(f"Available connections: {stats['available']}")
@classmethod
def close_all_connections(cls):
Close all connections in the pool.
Warning: This closes all connections, including those in use by threads! Only use during shutdown.
def db_reconnect(self, error_connect: str, sql: str, db=0)
Handle stale database connections by reacquiring from pool.
Process:
Stale signals detected:
Explicit connection pool context manager pattern.
def __init__(self, pool: ConnectionPool):
Initialize ObjData with pooled connection.
Parameters:
pool: ConnectionPool to acquire connection fromThread Safety: Each instance should be created per-thread. Do not share instances across threads.
def release_connection(self):
Return connection to pool.
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release_connection()
return False
from ObjDataThreadSafe import ObjDataThreadSafe
# Create thread-safe instance
obj = ObjDataThreadSafe()
# Use like regular ObjData
users = obj.sql_get_list("SELECT username FROM users")
user = obj.sql_get_value("SELECT email FROM users WHERE id = 1")
from ObjDataThreadSafe import ObjDataThreadSafe
import threading
def process_batch(batch_id):
# Each thread gets its own connection automatically
obj = ObjDataThreadSafe()
# Fetch data
sql = f"SELECT * FROM tasks WHERE batch_id = {batch_id}"
tasks = obj.sql_get_list(sql)
# Process tasks
for task in tasks:
process_task(task)
# Connection automatically released when thread exits
# Launch multiple worker threads
threads = []
for batch_id in range(1, 11):
t = threading.Thread(target=process_batch, args=(batch_id,))
threads.append(t)
t.start()
# Wait for completion
for t in threads:
t.join()
from ObjDataThreadSafe import ObjDataThreadSafe
from ObjConnectionPool import ConnectionPool
# Create custom pool with specific size
pool = ConnectionPool(size=50, config_name="primary")
# Share pool across multiple objects
obj1 = ObjDataThreadSafe(pool=pool)
obj2 = ObjDataThreadSafe(pool=pool)
# Both use same pool
print(ObjDataThreadSafe.get_pool_stats())
from ObjDataThreadSafe import ObjDataPooled
from ObjConnectionPool import ConnectionPool
pool = ConnectionPool(size=10)
# Context manager - connection auto-released
with ObjDataPooled(pool) as obj:
obj.sql_get_values("SELECT * FROM users")
# Manual acquire/release
obj = ObjDataPooled(pool)
try:
obj.sql_get_values("SELECT * FROM users")
finally:
obj.release_connection()
from ObjDataThreadSafe import ObjDataThreadSafe
from concurrent.futures import ThreadPoolExecutor
def fetch_user(user_id):
obj = ObjDataThreadSafe()
sql = f"SELECT * FROM users WHERE id = {user_id}"
return obj.sql_get_dictionary(sql)
# Process users concurrently
with ThreadPoolExecutor(max_workers=20) as executor:
user_ids = range(1, 101)
results = list(executor.map(fetch_user, user_ids))
print(f"Fetched {len(results)} users")
from ObjDataThreadSafe import ObjDataThreadSafe
import time
obj = ObjDataThreadSafe(pool_size=10)
# Monitor pool usage
while True:
stats = ObjDataThreadSafe.get_pool_stats()
print(f"Pool stats: {stats}")
time.sleep(5)
Both ObjDataThreadSafe and ObjDataPooled are fully thread-safe for concurrent use.
Each thread receives its own isolated database connection, preventing:
The connection pool itself is shared across threads and uses locks to ensure thread-safe operations.
# Small applications
obj = ObjDataThreadSafe(pool_size=5)
# Medium load
obj = ObjDataThreadSafe(pool_size=20)
# High concurrency
obj = ObjDataThreadSafe(pool_size=100)
| Feature | ObjDataThreadSafe | ObjDataPooled |
|---|---|---|
| Connection management | Automatic per-thread | Explicit acquire/release |
| Shared across threads | ✓ Yes, safely | ✗ No, create per-thread |
| Context manager | ✗ No | ✓ Yes |
| Use case | Most applications | Fine-grained control |
| Complexity | Low | Medium |
# FastAPI/Flask worker thread
def api_endpoint():
obj = ObjDataThreadSafe()
return obj.sql_get_list("SELECT * FROM items")
def job_worker():
obj = ObjDataThreadSafe()
while True:
job = obj.sql_get_dictionary("SELECT * FROM jobs WHERE status = 'pending' LIMIT 1")
if job:
process_job(job)
obj.sql_execute(f"UPDATE jobs SET status = 'complete' WHERE id = {job['id']}")
def batch_processor(batch_size=1000):
obj = ObjDataThreadSafe()
offset = 0
while True:
sql = f"SELECT * FROM records LIMIT {batch_size} OFFSET {offset}"
records = obj.sql_get_list(sql)
if not records:
break
process_records(records)
offset += batch_size
If all connections are in use, new requests will block until a connection becomes available.
Solution: Increase pool size or ensure threads release connections promptly.
# Increase pool size
obj = ObjDataThreadSafe(pool_size=50)
The module automatically handles stale connections through the db_reconnect() method.
If threads don't exit cleanly, connections may not be returned to the pool.
Solution: Use try/finally or context managers to ensure cleanup.
threading - Thread-local storage and locksObjData - Parent class for database operationsObjConnectionPool - Connection pool managementObjConnectionPool.py - Connection pool implementationObjData.py - Parent class providing database operationsObjects.py - Base object definitionsObjDataThreadSafe instance per thread, reuse it for multiple operationsclose_thread_connection() unless you need immediate releaseget_pool_stats() to track pool healthclose_all_connections() during application shutdown