The decision tree simulation now supports safe multiprocessing with per-process database connections, eliminating the race condition issues that previously forced single-process mode.
Parent Process:
├─ Creates ObjDecisionSwitch() with DB connection
├─ Stores in SERVICE_CACHE
└─ Forks child processes
└─ Children inherit DB connection → RACE CONDITION
Parent Process:
└─ Forks child processes (no DB connections yet)
└─ Each child calls _init_worker_process()
└─ Creates ObjDecisionSwitch(DB=0) with fresh connection
└─ Stores in process-local SERVICE_CACHE[1]
pool = multiprocessing.Pool(
processes=num_processes,
initializer=_init_worker_process, # Called AFTER fork
initargs=(decision_name, do_sim_recon)
)
def _init_worker_process(decision_name: str, do_sim_recon: bool):
# This runs in each worker process AFTER fork
service_instance = ObjDecisionSwitch(DB=0) # Fresh connection!
SERVICE_CACHE[1] = service_instance
def _simulation_worker(context: SimulationContext):
service = SERVICE_CACHE[1] # Process-local instance
service.compute_decision(context)
Edit factory.core/ObjDecisionSwitch.py line 1158:
# Single-process mode (safe, tested)
USE_MULTIPROCESSING = False
# Multiprocessing mode (faster, needs testing)
USE_MULTIPROCESSING = True
When enabled, defaults to CPU count with optional limit:
if USE_MULTIPROCESSING:
num_processes = multiprocessing.cpu_count() # e.g., 8
if num_processes > 4:
num_processes = 4 # Limit to 4 to avoid overwhelming DB
# Run a small simulation with multiprocessing enabled
decision = ObjDecisionSwitch()
decision.simulate("TestDecision", input_guid="TEST_GUID")
Expected: Should complete without errors
Watch for: Connection errors, deadlocks, incorrect results
# Test with larger dataset
# Ensure input table has 1000+ rows
decision.simulate("ProductionDecision")
Monitor:
SHOW PROCESSLIST; in MySQLps aux | grep pythontop# Compare results: single-process vs multiprocessing
# 1. Run with USE_MULTIPROCESSING = False
decision.simulate("TestDecision")
results_single = get_simulation_results("TestDecision")
# 2. Run with USE_MULTIPROCESSING = True
decision.simulate("TestDecision")
results_multi = get_simulation_results("TestDecision")
# 3. Compare outcomes
assert results_single == results_multi
Symptom: Too many connections error
Solution: Limit num_processes or increase MySQL max_connections
-- Check current limit
SHOW VARIABLES LIKE 'max_connections';
-- Increase if needed
SET GLOBAL max_connections = 200;
Symptom: System slowdown, OOM killer
Solution: Reduce num_processes or use chunking
if num_processes > 4:
num_processes = 4 # Already implemented
Symptom: Incorrect results, crashes
Solution: Verify no shared state between processes
Symptom: Multiprocessing fails on Windows
Root Cause: Windows uses 'spawn' instead of 'fork'
Solution: Already handled by _init_worker_process() design
_init_worker_process()def _init_worker_process(decision_name: str, do_sim_recon: bool) -> None:
"""Pool initializer - creates per-process DB connection"""
service_instance = ObjDecisionSwitch(DB=0)
service_instance.read(decision_name)
service_instance.DO_SIM_RECON = do_sim_recon
SERVICE_CACHE[1] = service_instance
_simulation_worker()# Before: def _simulation_worker(args: tuple[SimulationContext, str])
# After: def _simulation_worker(context: SimulationContext)
# Simplified signature - no run_id needed
# Each process uses SERVICE_CACHE[1]
simulate()# Single-process path (USE_MULTIPROCESSING = False)
service_instance = ObjDecisionSwitch(DB=0)
SERVICE_CACHE[1] = service_instance
for context in track(context_list):
_simulation_worker(context)
# Multiprocessing path (USE_MULTIPROCESSING = True)
pool = multiprocessing.Pool(
processes=num_processes,
initializer=_init_worker_process,
initargs=(decision_name, self.DO_SIM_RECON)
)
for _ in pool.imap_unordered(_simulation_worker, context_list):
progress.update(task_progress, advance=1)
num_processes = 2# Add environment variable control
import os
USE_MULTIPROCESSING = os.getenv('DECISION_TREE_MULTIPROCESSING', 'False') == 'True'
Ensure MySQL is configured for concurrent connections:
# my.cnf
max_connections = 200
thread_cache_size = 16
Add metrics to track:
# Objects.py __init__:
if context_in == 0:
self.cntx = Connection()
self.DB = self.cntx.connect_db() # Creates NEW connection
Without DB=0, the connection from parent process would be reused, causing race conditions.
Each process gets its own copy of module-level variables after fork:
# Parent: SERVICE_CACHE = {1: <instance with parent DB>}
# Child 1: SERVICE_CACHE = {1: <instance with child DB 1>}
# Child 2: SERVICE_CACHE = {1: <instance with child DB 2>}
Since each process only needs ONE service instance (they process tasks sequentially), we always use SERVICE_CACHE[1]. The old pattern of rotating through multiple run_ids was unnecessary.
SELECT * FROM information_schema.PROCESSLIST
WHERE COMMAND != 'Sleep';
-- Check output table growth
SELECT COUNT(*) FROM bloom_sim_DecisionName_sim_output;
-- Watch in real-time
SELECT COUNT(*) AS rows, NOW() AS timestamp
FROM bloom_sim_DecisionName_sim_output;
# Check Python processes
ps aux | grep python | grep ObjDecisionSwitch
# Monitor CPU/Memory
top -p $(pgrep -f ObjDecisionSwitch)
The multiprocessing implementation is ready for testing. The architecture properly handles per-process database connections, eliminating the race conditions that previously required single-process mode.
Next Steps:
Toggle: Change USE_MULTIPROCESSING = False to True in line 1158 of ObjDecisionSwitch.py