ObjSimulation extends ObjData to provide simulation-specific functionality, including comprehensive table reference tracking during simulation runs. It monitors which database tables are read from or written to during calculation and workflow simulations, enabling dependency analysis and optimization.
Module: factory.core/ObjSimulation.py
Inherits from: ObjData.ObjData
Test files: resource.test/pytests/factory.core/test_ObjSimulation_proxy.py, test_ObjSimulation_sql_tracking.py
Database table: def_simulation_table
This module enables developers to:
| Field | Type | Description |
|---|---|---|
| SimulationGuid | VARCHAR(100) | Unique identifier for simulation run |
| Package | VARCHAR(100) | Package name |
| Module | VARCHAR(100) | Module being tracked |
| TableName | VARCHAR(150) | Name of table accessed |
| Operation | VARCHAR(10) | 'read' or 'write' |
| TrackedAt | DATETIME | Timestamp of tracking |
Primary Key: (SimulationGuid, Module, TableName, Operation)
Indexes:
_active_simulations: Dict[str, Dict[str, Any]] = {}
_active_simulations_lock = threading.RLock()
Thread-safe registry allowing child objects to inherit tracking context from active simulations.
_simulation_guid: GUID identifying current simulation run_tracking_module: Name of module being tracked_tracked_tables: Dict with 'read' and 'write' sets of table names_tracking_enabled: Boolean flag for tracking state_persisted_tables: Cache of tables already persisted to database_simulation_guid_sql: SQL query used to retrieve simulation GUIDs__init__(db_connection=0)Initializes the simulation object with table tracking capabilities.
Key behavior:
_inherit_active_simulation_tracking() -> NoneChecks for active simulations and inherits tracking context.
Process:
Use case: Enables child objects (like ObjServiceCalculation) created during workflow execution to automatically participate in parent's table tracking.
enable_table_tracking(simulation_guid: str, module_name: str, simulation_guid_sql: Optional[str] = None) -> NoneEnables table tracking for a simulation run.
Parameters:
simulation_guid: GUID identifying this simulation runmodule_name: Name of module being tracked (e.g., 'ObjServiceCalculation')simulation_guid_sql: SQL query used to retrieve simulation GUIDsBehavior:
Example:
sim = ObjSimulation()
sim.enable_table_tracking(
simulation_guid="sim-12345",
module_name="ObjServiceCalculation",
simulation_guid_sql="SELECT guid FROM calculations WHERE active = 1"
)
track_sql(sql: str) -> NoneTracks tables accessed in a SQL query.
Parameters:
sql: SQL query string to analyzeProcess:
_determine_table_operations() to parse SQLExample:
sim.track_sql("SELECT * FROM users WHERE active = 1")
# Tracks: {'read': {'users'}, 'write': set()}
sim.track_sql("INSERT INTO logs (message) VALUES ('test')")
# Tracks: {'read': set(), 'write': {'logs'}}
flush_tracked_tables() -> NonePersists currently tracked tables to database without disabling tracking.
Use case: Call periodically during long-running simulations to persist intermediate results.
disable_table_tracking() -> NoneDisables table tracking and persists tracked data.
Process:
_determine_table_operations(sql_statement: str) -> Dict[str, Set[str]]Parses SQL statement and returns tables read from and written to.
Returns:
Processing:
SQL Statement Type Classification:
Example:
ops = sim._determine_table_operations(
"UPDATE users u JOIN accounts a ON u.account_id = a.id SET u.status = 'active'"
)
# Returns: {'read': {'users', 'accounts'}, 'write': {'users'}}
_extract_tables_from_tokens(sql_query: str) -> Set[str]Helper method to extract table names from SQL using sqlglot.
Returns:
Error handling:
_parse_and_track_sql(sql: str, sql_type: str) -> NoneHelper method to parse SQL and add tables to tracked tables.
Parameters:
sql: SQL query to parsesql_type: Description of SQL type (for logging)Behavior:
_persist_tracked_tables() -> NonePersists tracked tables to def_simulation_table database table.
Process:
from ObjSimulation import ObjSimulation
# Create simulation instance
sim = ObjSimulation()
# Enable tracking
sim.enable_table_tracking(
simulation_guid="sim-uuid-123",
module_name="MyCalculation"
)
# Execute queries (tracking happens automatically if sql_execute/sql_get_* used)
sim.sql_execute("SELECT * FROM customers WHERE active = 1")
sim.sql_execute("INSERT INTO results (value) VALUES (42)")
# Disable and persist
sim.disable_table_tracking()
from ObjSimulation import ObjSimulation
# Parent workflow object
workflow = ObjSimulation()
workflow.enable_table_tracking("workflow-123", "ObjWorkflow")
# Child calculation object (automatically inherits tracking)
from ObjServiceCalculation import ObjServiceCalculation
calc = ObjServiceCalculation() # Inherits tracking from workflow
calc.execute() # Table access is tracked
# Disable tracking on parent (persists all tracked tables)
workflow.disable_table_tracking()
sim = ObjSimulation()
sim.enable_table_tracking("long-sim-456", "LongProcess")
for batch in large_dataset:
# Process batch...
process_batch(batch)
# Flush tracked tables periodically
sim.flush_tracked_tables()
sim.disable_table_tracking()
sim = ObjSimulation()
# Analyze complex query
ops = sim._determine_table_operations("""
UPDATE orders o
JOIN customers c ON o.customer_id = c.id
SET o.status = 'processed'
WHERE c.region = 'US'
""")
print(f"Read tables: {ops['read']}") # {'orders', 'customers'}
print(f"Write tables: {ops['write']}") # {'orders'}
The module has comprehensive test coverage across two test files:
Tests proxy pattern for SQL method interception and tracking.
Tests SQL parsing and table tracking functionality.
Run tests:
pytest resource.test/pytests/factory.core/test_ObjSimulation_proxy.py -v
pytest resource.test/pytests/factory.core/test_ObjSimulation_sql_tracking.py -v
sqlglot - SQL parsing and analysisyaml - Configuration file parsingthreading - Thread-safe simulation registryre - Regular expression for ANSI code cleaningObjData - Parent class providing database operationsThe module only enables tracking in SIM (simulation) deployment:
| Deployment | Tracking Enabled |
|---|---|
| SIM | ✓ Yes |
| DEV | ✗ No |
| UAT | ✗ No |
| PROD | ✗ No |
This prevents performance overhead in non-simulation environments.
The module handles two patterns of ANSI escape codes:
\x1B[0m, \x9B...[0m, [4m (without escape character)Normalizes INSERT IGNORE to INSERT for parsing compatibility with sqlglot.
Gracefully handles SQL that sqlglot cannot parse (e.g., some MySQL-specific syntax), returning empty table sets instead of crashing.
# In ObjWorkflow or ObjWorkflowSimul
workflow = ObjSimulation()
workflow.enable_table_tracking(workflow_guid, "ObjWorkflow")
# All SQL executed during workflow is tracked
workflow.execute_steps()
# Child objects inherit tracking automatically
# In ObjServiceCalculation or similar
calc = ObjSimulation()
calc.enable_table_tracking(calc_guid, "ObjServiceCalculation")
# Track pre-simulation SQL
calc._Simulation_presql = "DELETE FROM temp_results"
calc._parse_and_track_sql(calc._Simulation_presql, "Pre-SQL")
# Execute calculation
calc.run()
# Track post-simulation SQL
calc._Simulation_postsql = "UPDATE summary SET calculated = 1"
calc._parse_and_track_sql(calc._Simulation_postsql, "Post-SQL")
calc.disable_table_tracking()
ObjData.py - Parent class for database operationsObjWorkflow.py - Uses simulation tracking for workflow analysisObjServiceCalculation.py - Calculation modules that use trackingServeWorkflow.py - Workflow service that initiates simulationsCheck:
self.get_deployment() == "SIM"self._tracking_enabled == Trueflush_tracked_tables() periodically