Date: 2025-12-14
Status: Production-Ready
Version: 8.0
Best for: Multi-threaded applications, FastAPI workers, ThreadPoolExecutor
from ObjDataThreadSafe import ObjDataThreadSafe
# Simple usage - just works!
def worker():
obj = ObjDataThreadSafe() # Automatic thread-local connection
users = obj.sql_get_values("SELECT * FROM users")
return users
# Safe to run in multiple threads
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(worker) for _ in range(100)]
results = [f.result() for f in futures]
Benefits:
Best for: When you need fine-grained control over connections
from ObjConnectionPool import ConnectionPool
from ObjData import ObjData
# Create pool once
pool = ConnectionPool(size=20, config_name="primary")
def worker():
# Acquire connection from pool
with pool as conn:
obj = ObjData(DB=conn)
users = obj.sql_get_values("SELECT * FROM users")
return users
# Connection automatically returned to pool
# Use in threads
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(worker) for _ in range(100)]
results = [f.result() for f in futures]
Benefits:
Best for: Single-threaded applications, simple scripts
from ObjData import ObjData
# Single thread - standard usage
obj = ObjData()
users = obj.sql_get_values("SELECT * FROM users")
# Multi-threaded - create instance per thread
def worker():
obj = ObjData() # New connection per thread
users = obj.sql_get_values("SELECT * FROM users")
return users
# Works, but creates many connections
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(worker) for _ in range(100)]
Limitations:
from fastapi import FastAPI, BackgroundTasks
from ObjDataThreadSafe import ObjDataThreadSafe
app = FastAPI()
# ObjDataThreadSafe handles thread safety automatically
obj = ObjDataThreadSafe(pool_size=20)
@app.get("/users")
async def get_users():
"""Each request runs in its own thread - safe to share obj"""
users = obj.sql_get_values("SELECT * FROM users LIMIT 100")
return {"users": users}
@app.post("/users")
async def create_user(name: str, email: str):
"""Concurrent POST requests are safe"""
sql = f"INSERT INTO users (name, email) VALUES ('{name}', '{email}')"
obj.sql_execute(sql)
return {"status": "created"}
@app.get("/stats")
async def get_stats():
"""Check connection pool statistics"""
return ObjDataThreadSafe.get_pool_stats()
Why this works:
from ObjConnectionPool import ConnectionPool
from ObjData import ObjData
import threading
# Create global pool
pool = ConnectionPool(
size=10, # 10 base connections
max_overflow=5, # Up to 5 extra connections
timeout=30, # 30 second timeout
recycle=3600, # Recycle connections after 1 hour
)
def process_order(order_id: int):
"""Background task to process an order"""
with pool as conn:
obj = ObjData(DB=conn)
# Get order details
order = obj.sql_get_values(
f"SELECT * FROM orders WHERE id = {order_id}"
)
# Process payment
obj.sql_execute(
f"UPDATE orders SET status = 'processing' WHERE id = {order_id}"
)
# Send notification
obj.sql_execute(
f"INSERT INTO notifications (order_id, message) "
f"VALUES ({order_id}, 'Order processing')"
)
# Start 100 background tasks
threads = []
for order_id in range(1, 101):
t = threading.Thread(target=process_order, args=(order_id,))
threads.append(t)
t.start()
# Wait for all to complete
for t in threads:
t.join()
print(f"Pool stats: {pool.get_stats()}")
from concurrent.futures import ThreadPoolExecutor, as_completed
from ObjDataThreadSafe import ObjDataThreadSafe
# Create shared thread-safe instance
db = ObjDataThreadSafe(pool_size=15)
def extract_data(source_table: str):
"""Extract data from source table"""
return db.sql_get_values(f"SELECT * FROM {source_table}")
def transform_data(data):
"""Transform data (CPU-bound)"""
# Apply transformations
return [process_row(row) for row in data]
def load_data(table: str, data):
"""Load data into destination table"""
for row in data:
db.sql_execute(
f"INSERT INTO {table} VALUES ({row})"
)
# ETL pipeline with multiple sources
sources = ["sales", "inventory", "customers", "products"]
with ThreadPoolExecutor(max_workers=10) as executor:
# Extract in parallel
extract_futures = {
executor.submit(extract_data, source): source
for source in sources
}
for future in as_completed(extract_futures):
source = extract_futures[future]
data = future.result()
# Transform and load
transformed = transform_data(data)
load_data(f"staging_{source}", transformed)
print("ETL complete!")
print(f"Pool stats: {ObjDataThreadSafe.get_pool_stats()}")
from queue import Queue
from threading import Thread
from ObjDataThreadSafe import ObjDataThreadSafe
# Task queue
task_queue = Queue()
# Worker function
def worker(worker_id: int):
"""Process tasks from queue"""
obj = ObjDataThreadSafe()
while True:
task = task_queue.get()
if task is None: # Shutdown signal
break
try:
# Process task
task_type, task_data = task
if task_type == "insert":
obj.sql_execute(
f"INSERT INTO logs VALUES ('{task_data}')"
)
elif task_type == "select":
result = obj.sql_get_values(
f"SELECT * FROM data WHERE id = {task_data}"
)
print(f"Worker {worker_id}: {result}")
finally:
task_queue.task_done()
print(f"Worker {worker_id} shutting down")
ObjDataThreadSafe.close_thread_connection()
# Start worker threads
num_workers = 5
workers = []
for i in range(num_workers):
t = Thread(target=worker, args=(i,))
t.start()
workers.append(t)
# Add tasks
for i in range(100):
if i % 2 == 0:
task_queue.put(("insert", f"log-{i}"))
else:
task_queue.put(("select", i))
# Wait for tasks to complete
task_queue.join()
# Shutdown workers
for _ in range(num_workers):
task_queue.put(None)
for w in workers:
w.join()
Add to config.yaml:
database:
primary:
ip: "localhost"
user: "myuser"
password: "^encrypted_password"
db: "mydatabase"
# Connection pool settings
pool:
size: 10 # Base pool size
max_overflow: 5 # Extra connections allowed
timeout: 30 # Connection acquisition timeout (seconds)
recycle: 3600 # Recycle connections after 1 hour
pre_ping: true # Test connection before use
import Objects
from ObjConnectionPool import ConnectionPool
# Read pool settings from config
config = Objects.global_config
pool_size = int(config.Get("database", "pool.size", "10"))
max_overflow = int(config.Get("database", "pool.max_overflow", "5"))
timeout = float(config.Get("database", "pool.timeout", "30"))
recycle = int(config.Get("database", "pool.recycle", "3600"))
# Create pool with config settings
pool = ConnectionPool(
size=pool_size,
max_overflow=max_overflow,
timeout=timeout,
recycle=recycle,
)
from ObjDataThreadSafe import ObjDataThreadSafe
# Get pool stats
stats = ObjDataThreadSafe.get_pool_stats()
print(f"""
Connection Pool Statistics:
Total connections: {stats['total_connections']}
Pool size: {stats['pool_size']}
Max size: {stats['max_size']}
Created: {stats['created']}
Recycled: {stats['recycled']}
Acquired: {stats['acquired']}
Released: {stats['released']}
Timeouts: {stats['timeouts']}
""")
Enable debug logging to see connection activity:
# In ObjConnectionPool.py
DO_DEBUG = True
# In ObjDataThreadSafe.py
DO_DEBUG = True
# Now you'll see:
# ConnectionPool: Created new connection (total: 5)
# ObjDataThreadSafe: Thread 140234 acquired connection
# ConnectionPool: Acquired connection (pool size: 4)
# ConnectionPool: Released connection (pool size: 5)
-- Check current connections
SHOW STATUS LIKE 'Threads_connected';
-- List active connections
SELECT * FROM information_schema.PROCESSLIST;
-- Check max connections
SHOW VARIABLES LIKE 'max_connections';
-- Connection history
SELECT * FROM mysql.general_log WHERE command_type = 'Connect';
Formula: pool_size = (num_cores * 2) + effective_spindle_count
For typical web application:
size=5, max_overflow=2size=10, max_overflow=5size=20, max_overflow=10size=50, max_overflow=20Purpose: Prevent stale connections
pool = ConnectionPool(
size=10,
recycle=3600, # Recycle after 1 hour
)
Recommendations:
recycle=600 (10 minutes)recycle=3600 (1 hour)recycle=7200 (2 hours)pool = ConnectionPool(
size=10,
timeout=30, # Wait up to 30 seconds for connection
)
Guidelines:
timeout=5 (fail fast)timeout=30 (default)timeout=60 (can wait)Before (not thread-safe):
def worker():
obj = ObjData() # New connection every call
users = obj.sql_get_values("SELECT * FROM users")
return users
After (thread-safe with pooling):
def worker():
obj = ObjDataThreadSafe() # Pooled, thread-local connection
users = obj.sql_get_values("SELECT * FROM users")
return users
Changes required: Replace ObjData with ObjDataThreadSafe
Compatibility: 100% backwards compatible
Before:
# Create connections manually
connections = []
for i in range(10):
conn = Objects.Connection().connect_db()
connections.append(conn)
def worker(conn):
obj = ObjData(DB=conn)
# ...
# Manually assign connections to threads
After:
# Let pool manage connections
pool = ConnectionPool(size=10)
def worker():
with pool as conn:
obj = ObjData(DB=conn)
# ...
Symptom: Empty: Connection pool timeout after 30s
Causes:
Solutions:
# Increase pool size
pool = ConnectionPool(size=20, max_overflow=10)
# Increase timeout
pool = ConnectionPool(timeout=60)
# Ensure connections are released
with pool as conn:
obj = ObjData(DB=conn)
# ... use connection ...
# Connection auto-released here
Symptom: MySQL error "Too many connections"
Cause: Total connections exceed MySQL max_connections
Solutions:
# Reduce pool size
pool = ConnectionPool(
size=10, # Lower base size
max_overflow=5, # Lower overflow
)
# Or increase MySQL limit
# In my.cnf:
# max_connections = 500
Symptom: Cursor errors in multi-threaded code
Cause: Sharing ObjData instance across threads
Solution: Use ObjDataThreadSafe:
# DON'T: Share ObjData instance
obj = ObjData()
def worker():
obj.sql_get_values(...) # ⚠️ Cursor conflict!
# DO: Use ObjDataThreadSafe
obj = ObjDataThreadSafe()
def worker():
obj.sql_get_values(...) # ✅ Thread-safe!
Use ObjDataThreadSafe for multi-threaded apps
obj = ObjDataThreadSafe(pool_size=20)
Configure appropriate pool size
pool = ConnectionPool(size=num_cores * 2)
Monitor pool statistics
stats = ObjDataThreadSafe.get_pool_stats()
if stats['timeouts'] > 0:
# Increase pool size
Use context managers for explicit control
with pool as conn:
obj = ObjData(DB=conn)
Close thread connections when done
def worker():
obj = ObjDataThreadSafe()
# ... work ...
ObjDataThreadSafe.close_thread_connection()
Don't share standard ObjData across threads
# BAD
obj = ObjData()
def worker():
obj.sql_get_values(...) # Cursor conflict!
Don't create excessive connections
# BAD
def worker():
for i in range(1000):
obj = ObjData() # 1000 connections!
Don't forget to release pooled connections
# BAD
conn = pool.acquire()
# ... use connection ...
# Forgot to call pool.release(conn)!
Don't use tiny pools for high concurrency
# BAD for 100 concurrent requests
pool = ConnectionPool(size=2)
| Pattern | Use Case | Thread-Safe | Pooled | Complexity |
|---|---|---|---|---|
| ObjDataThreadSafe | Multi-threaded apps | ✅ Yes | ✅ Yes | Low |
| ObjData + ConnectionPool | Explicit control | ✅ Yes | ✅ Yes | Medium |
| Standard ObjData | Single-threaded | ⚠️ Per-thread | ❌ No | Low |
Recommendation: Use ObjDataThreadSafe for new multi-threaded applications.
Next Steps:
ObjData() with ObjDataThreadSafe() in multi-threaded codeReferences:
factory.core/ObjData_ThreadSafety_Analysis.md - Detailed analysisfactory.core/ObjConnectionPool.py - Pool implementationfactory.core/ObjDataThreadSafe.py - Thread-safe wrapper