Purpose: Guide for implementing comprehensive performance metrics tracking in any module
Reference Implementation: ObjDataImport (factory.core/ObjDataImport.py)
Date: 2025-12-28
This roadmap documents the performance metrics pattern successfully implemented in ObjDataImport, achieving:
The pattern is designed to be replicated across all major modules for consistent performance monitoring.
def_<module>_metric queries┌─────────────────────────────────────────────────────────┐
│ 1. In-Memory Metrics (Session Statistics) │
│ - Class-level aggregation │
│ - Fast, no I/O overhead │
│ - Useful for real-time monitoring │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 2. Database Persistence (Historical Analysis) │
│ - def_<module>_metric table │
│ - Per-operation detailed metrics │
│ - Queryable, reportable, archivable │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 3. YAML Schema Definition (Infrastructure as Code) │
│ - Version controlled │
│ - Auto-created via create_tables_from_yaml() │
│ - Consistent collation with {collation} placeholder │
└─────────────────────────────────────────────────────────┘
1.1 Identify Key Operations
Example (ObjDataImport):
# Primary operation: import_file()
# Metrics needed: rows/sec, file size, memory, success rate
1.2 Define Metrics to Track
Core metrics (apply to all modules):
Module-specific metrics:
1.3 Choose Primary Key
Standard pattern:
PRIMARY KEY (
<ModuleCode>, -- Operation identifier (e.g., DataimportCode)
Package, -- Package/tenant identifier
AxionImportTime, -- Timestamp of operation
AxionImportGuid -- Unique session GUID
)
2.1 Create Metrics Table YAML
Location: Add to your module's YAML file (e.g., factory.core/ObjYourModule.yaml)
Template:
database:
schema:
def_<module>_metric: |
CREATE TABLE `def_<module>_metric` (
-- Identity columns (standard across all modules)
`<ModuleCode>` varchar(255) NOT NULL COMMENT 'Operation identifier',
`Package` varchar(100) NOT NULL COMMENT 'Package identifier',
`AxionImportTime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Operation timestamp',
`AxionImportGuid` varchar(36) NOT NULL COMMENT 'Unique session GUID',
-- Operation-specific identifiers
`OperationName` varchar(255) DEFAULT NULL COMMENT 'Name of operation performed',
`TargetEntity` varchar(255) DEFAULT NULL COMMENT 'Target of operation',
-- Performance metrics (adapt to your module)
`ItemsProcessed` int(11) DEFAULT 0 COMMENT 'Number of items processed',
`ElapsedSeconds` decimal(10,3) DEFAULT NULL COMMENT 'Time taken in seconds',
`ItemsPerSecond` decimal(12,2) DEFAULT NULL COMMENT 'Throughput (calculated)',
`MemoryUsageMB` decimal(12,2) DEFAULT NULL COMMENT 'Peak memory usage in MB',
-- Resource metrics (optional, add as needed)
`FileSizeMB` decimal(12,2) DEFAULT NULL COMMENT 'File size if applicable',
`CacheHits` int(11) DEFAULT 0 COMMENT 'Cache hits if applicable',
`APICallsMade` int(11) DEFAULT 0 COMMENT 'External API calls if applicable',
-- Status and error tracking
`OperationSuccess` char(1) DEFAULT 'N' COMMENT 'Y/N - Whether operation succeeded',
`ErrorMessage` text DEFAULT NULL COMMENT 'Error message if failed',
`ErrorCategory` varchar(50) DEFAULT NULL COMMENT 'Error category (TIMEOUT, IO_ERROR, etc.)',
-- Indexes for common queries
PRIMARY KEY (`<ModuleCode>`, `Package`, `AxionImportTime`, `AxionImportGuid`),
KEY `idx_operation_time` (`AxionImportTime`),
KEY `idx_success` (`OperationSuccess`),
KEY `idx_module_recent` (`<ModuleCode>`, `Package`, `AxionImportTime`),
KEY `idx_performance` (`ItemsPerSecond`, `ElapsedSeconds`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE={collation}
COMMENT='Performance metrics for <module> operations';
2.2 Important Schema Considerations
ENGINE=InnoDB (not Aria or MyISAM){collation} placeholder (will be replaced at runtime)decimal for metrics requiring precision (time, throughput)int for counts (rows, items, cache hits)3.1 Create Metrics Class (In-Memory Aggregation)
Add to your module file (e.g., ObjYourModule.py):
import time
class ModuleMetrics:
"""Tracks operation performance metrics across all instances."""
def __init__(self):
self.total_operations = 0
self.total_items = 0
self.total_time = 0.0
self.errors = 0
self.start_time = time.time()
def record_operation(self, items: int, elapsed: float, success: bool = True):
"""Record metrics from a completed operation."""
self.total_operations += 1
self.total_items += items
self.total_time += elapsed
if not success:
self.errors += 1
def get_stats(self) -> dict:
"""Get current performance statistics."""
uptime = time.time() - self.start_time
avg_rate = self.total_items / self.total_time if self.total_time > 0 else 0
error_rate = (self.errors / self.total_operations * 100) if self.total_operations > 0 else 0
return {
'total_operations': self.total_operations,
'total_items': self.total_items,
'total_time': self.total_time,
'avg_rate': avg_rate,
'errors': self.errors,
'error_rate': error_rate,
'uptime': uptime
}
def print_stats(self):
"""Print formatted statistics to console."""
stats = self.get_stats()
print(f"\n{'=' * 60}")
print(f"Module Performance Statistics")
print(f"{'=' * 60}")
print(f"Total Operations: {stats['total_operations']:,}")
print(f"Total Items: {stats['total_items']:,}")
print(f"Total Time: {stats['total_time']:.2f}s")
print(f"Average Rate: {stats['avg_rate']:,.2f} items/sec")
print(f"Errors: {stats['errors']} ({stats['error_rate']:.2f}%)")
print(f"Uptime: {stats['uptime']:.2f}s")
print(f"{'=' * 60}\n")
3.2 Add Metrics Class to Module
class ObjYourModule(ObjSupervisor.Supervisor):
"""Your module class."""
# Class-level performance metrics (shared across all instances)
_metrics = ModuleMetrics()
def __init__(self, ...):
# Your existing initialization
pass
3.3 Track Metrics in Main Operation
Pattern for your primary operation method:
def your_operation(self, operation_params) -> bool:
"""Main operation method with metrics tracking."""
operation_start_time = time.time()
self.successful = False
# Generate unique session GUID
operation_session_guid = self.get_uuid()
# Track starting memory (if applicable)
memory_usage_mb = None
try:
# === YOUR OPERATION LOGIC HERE ===
items_processed = 0
# ... your operation code ...
self.successful = True
except Exception as e:
self.error(f"Operation failed: {e}")
self.successful = False
finally:
# Calculate elapsed time
operation_elapsed = time.time() - operation_start_time
# Track memory usage
try:
import psutil
process = psutil.Process()
memory_usage_mb = round(process.memory_info().rss / (1024 * 1024), 2)
except (ImportError, Exception) as e:
self.debug(f"Could not get memory usage: {e}")
# Record to in-memory metrics
self.__class__._metrics.record_operation(
items=items_processed,
elapsed=operation_elapsed,
success=self.successful
)
# Persist to database
self.write_metrics_to_db(
operation_name='your_operation',
session_guid=operation_session_guid,
items_processed=items_processed,
elapsed_seconds=operation_elapsed,
memory_usage_mb=memory_usage_mb,
success=self.successful,
error_message="; ".join(self.failure_reasons) if self.failure_reasons else None
)
return self.successful
3.4 Implement write_metrics_to_db Method
def write_metrics_to_db(
self,
operation_name: str,
session_guid: str,
items_processed: int,
elapsed_seconds: float,
memory_usage_mb: float = None,
success: bool = True,
error_message: str = None,
**additional_metrics # Allow module-specific metrics
):
"""
Persists operation metrics to def_<module>_metric table.
Args:
operation_name: Name of the operation performed
session_guid: Unique GUID for this operation session
items_processed: Number of items processed
elapsed_seconds: Time taken for operation
memory_usage_mb: Peak memory usage in MB
success: Whether operation succeeded
error_message: Error message if failed
**additional_metrics: Module-specific metrics (file_size, cache_hits, etc.)
"""
try:
# Ensure required attributes are set
if not hasattr(self, '_YourModuleCode') or not self._YourModuleCode:
self.debug("Warning: Cannot write metrics - ModuleCode not set")
return
# Helper function to quote SQL values
def quote_value(val):
if val is None:
return 'NULL'
elif isinstance(val, (int, float)):
return str(val)
else:
# Escape single quotes and wrap in quotes
return f"'{str(val).replace(chr(39), chr(39)+chr(39))}'"
# Calculate throughput
items_per_second = None
if elapsed_seconds > 0:
items_per_second = round(items_processed / elapsed_seconds, 2)
# Prepare values
operation_success = 'Y' if success else 'N'
# Truncate error message if too long
if error_message and len(error_message) > 5000:
error_message = error_message[:5000] + "... (truncated)"
# Build SQL with base columns
sql = f"""
INSERT INTO def_<module>_metric (
<ModuleCode>,
Package,
AxionImportTime,
AxionImportGuid,
OperationName,
ItemsProcessed,
ElapsedSeconds,
ItemsPerSecond,
MemoryUsageMB,
OperationSuccess,
ErrorMessage
) VALUES (
{quote_value(self._YourModuleCode)},
{quote_value(self.get_package())},
NOW(),
{quote_value(session_guid)},
{quote_value(operation_name)},
{quote_value(items_processed)},
{quote_value(round(elapsed_seconds, 3))},
{quote_value(items_per_second)},
{quote_value(memory_usage_mb)},
{quote_value(operation_success)},
{quote_value(error_message)}
)
"""
self.sql_execute(sql)
self.debug(f"Metrics written to def_<module>_metric for {operation_name}")
except Exception as e:
# Don't fail the operation if metrics writing fails
self.error(f"Warning: Could not write metrics to database: {e}")
import traceback
self.error(f"Traceback: {traceback.format_exc()}")
4.1 Create Comprehensive Test Suite
Location: resource.test/pytests/factory.core/test_ObjYourModule_metrics.py
"""
Comprehensive Test Suite for Module Metrics
Tests various operations, sizes, and edge cases
"""
import os
import sys
import pytest
from pathlib import Path
base_path = os.getcwd()
paths = [
"",
"/factory.core",
# Add your module paths
]
for relative_path in paths:
sys.path.append(base_path + relative_path)
from ObjYourModule import ObjYourModule
class TestModuleMetrics:
"""Comprehensive tests for module metrics functionality"""
@classmethod
def setup_class(cls):
"""Setup test environment once for all tests"""
cls.module = ObjYourModule()
def teardown_method(self):
"""Cleanup after each test"""
# Clean up test data
pass
def test_01_small_operation(self):
"""Test metrics for small operation"""
# Run small operation
result = self.module.your_operation(test_param="small")
assert result is True, "Operation should succeed"
# Verify metrics were written
metrics = self.module.sql_get_list("""
SELECT ItemsProcessed, ElapsedSeconds, MemoryUsageMB, OperationSuccess
FROM def_<module>_metric
WHERE OperationName = 'your_operation'
ORDER BY AxionImportTime DESC
LIMIT 1
""")
assert len(metrics) > 0, "Metrics should be recorded"
assert metrics[0][3] == 'Y', "Success should be Y"
def test_02_large_operation(self):
"""Test metrics for large operation"""
# Run large operation
result = self.module.your_operation(test_param="large")
# Verify performance is acceptable
metrics = self.module.sql_get_list("""
SELECT ItemsProcessed, ElapsedSeconds, ItemsPerSecond
FROM def_<module>_metric
WHERE OperationName = 'your_operation'
ORDER BY AxionImportTime DESC
LIMIT 1
""")
items_per_sec = metrics[0][2]
assert items_per_sec > 1000, f"Throughput should be > 1000/sec, got {items_per_sec}"
def test_03_memory_tracking(self):
"""Test memory usage tracking"""
# Run operation
self.module.your_operation(test_param="memory_test")
# Verify memory was tracked
metrics = self.module.sql_get_list("""
SELECT MemoryUsageMB
FROM def_<module>_metric
ORDER BY AxionImportTime DESC
LIMIT 1
""")
assert metrics[0][0] is not None, "Memory usage should be tracked"
assert metrics[0][0] > 0, "Memory usage should be positive"
def test_04_error_tracking(self):
"""Test error message tracking"""
# Run operation that will fail
result = self.module.your_operation(test_param="invalid")
assert result is False, "Operation should fail"
# Verify error was recorded
metrics = self.module.sql_get_list("""
SELECT OperationSuccess, ErrorMessage
FROM def_<module>_metric
ORDER BY AxionImportTime DESC
LIMIT 1
""")
assert metrics[0][0] == 'N', "Success should be N for failed operation"
assert metrics[0][1] is not None, "Error message should be recorded"
if __name__ == "__main__":
pytest.main([__file__, "-v", "-s"])
4.2 Create Benchmark Script
Location: resource.test/benchmark_<module>.py
"""
Performance Benchmarking for Module Optimizations
Measures actual performance improvements
"""
import os
import sys
import time
from pathlib import Path
base_path = os.getcwd()
# Add paths...
from ObjYourModule import ObjYourModule
class ModuleBenchmark:
"""Benchmark module performance"""
def __init__(self):
self.module = ObjYourModule()
self.results = []
def benchmark_operation(self, operation_name: str, params: dict, iterations: int = 3):
"""
Benchmark a single operation.
Args:
operation_name: Name of operation to benchmark
params: Parameters to pass to operation
iterations: Number of test iterations
"""
print(f"\n{'=' * 80}")
print(f"Benchmark: {operation_name}")
print(f"Iterations: {iterations}")
print(f"{'=' * 80}")
times = []
items_counts = []
for i in range(iterations):
# Run timed operation
start = time.time()
self.module.your_operation(**params)
elapsed = time.time() - start
# Get item count (adapt to your module)
items = params.get('items_count', 0)
times.append(elapsed)
items_counts.append(items)
items_per_sec = items / elapsed if elapsed > 0 else 0
print(f" Iteration {i+1}: {elapsed:.2f}s | {items:,} items | {items_per_sec:,.0f} items/sec")
# Calculate statistics
avg_time = sum(times) / len(times)
avg_items = sum(items_counts) / len(items_counts)
avg_rate = avg_items / avg_time if avg_time > 0 else 0
result = {
"operation": operation_name,
"avg_time": avg_time,
"avg_items": avg_items,
"avg_rate": avg_rate,
}
self.results.append(result)
print(f"\n Average: {avg_time:.2f}s | {avg_items:,.0f} items | {avg_rate:,.0f} items/sec")
def print_summary(self):
"""Print comprehensive benchmark summary"""
print(f"\n\n{'=' * 80}")
print("BENCHMARK SUMMARY")
print(f"{'=' * 80}\n")
print(f"{'Operation':<30} {'Items':<12} {'Time (s)':<12} {'Rate':<12}")
print(f"{'-' * 80}")
for result in self.results:
print(
f"{result['operation']:<30} "
f"{result['avg_items']:>10,.0f} "
f"{result['avg_time']:>10.2f} "
f"{result['avg_rate']:>10,.0f}"
)
def main():
"""Run comprehensive benchmarks"""
benchmark = ModuleBenchmark()
# Run benchmarks for different operation sizes
benchmarks_to_run = [
("Small Operation", {"items_count": 100}),
("Medium Operation", {"items_count": 1000}),
("Large Operation", {"items_count": 10000}),
]
for name, params in benchmarks_to_run:
benchmark.benchmark_operation(name, params, iterations=3)
benchmark.print_summary()
if __name__ == "__main__":
main()
4.3 Run Tests
# Run comprehensive tests
pytest resource.test/pytests/factory.core/test_ObjYourModule_metrics.py -v -s
# Run benchmarks
python resource.test/benchmark_<module>.py
Key Metrics:
Schema:
CREATE TABLE `def_workflow_metric` (
`WorkflowCode` varchar(255) NOT NULL,
`Package` varchar(100) NOT NULL,
`AxionImportTime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`AxionImportGuid` varchar(36) NOT NULL,
`WorkflowName` varchar(255) DEFAULT NULL,
`StepsCompleted` int(11) DEFAULT 0,
`StepsFailed` int(11) DEFAULT 0,
`ElapsedSeconds` decimal(10,3) DEFAULT NULL,
`MemoryUsageMB` decimal(12,2) DEFAULT NULL,
`RetryAttempts` int(11) DEFAULT 0,
`ExecutionMode` varchar(20) DEFAULT NULL COMMENT 'PARALLEL, SEQUENTIAL',
`OperationSuccess` char(1) DEFAULT 'N',
`ErrorMessage` text DEFAULT NULL,
PRIMARY KEY (`WorkflowCode`, `Package`, `AxionImportTime`, `AxionImportGuid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE={collation};
Key Metrics:
Schema:
CREATE TABLE `def_api_metric` (
`EndpointCode` varchar(255) NOT NULL,
`Package` varchar(100) NOT NULL,
`AxionImportTime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`AxionImportGuid` varchar(36) NOT NULL,
`HTTPMethod` varchar(10) DEFAULT NULL COMMENT 'GET, POST, PUT, DELETE',
`Endpoint` varchar(500) DEFAULT NULL,
`StatusCode` int(11) DEFAULT NULL,
`RequestSizeKB` decimal(12,2) DEFAULT NULL,
`ResponseSizeKB` decimal(12,2) DEFAULT NULL,
`ElapsedSeconds` decimal(10,3) DEFAULT NULL,
`CacheHit` char(1) DEFAULT 'N',
`OperationSuccess` char(1) DEFAULT 'N',
`ErrorMessage` text DEFAULT NULL,
PRIMARY KEY (`EndpointCode`, `Package`, `AxionImportTime`, `AxionImportGuid`),
KEY `idx_status` (`StatusCode`),
KEY `idx_endpoint` (`Endpoint`(255))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE={collation};
Key Metrics:
Schema:
CREATE TABLE `def_notify_metric` (
`NotifyCode` varchar(255) NOT NULL,
`Package` varchar(100) NOT NULL,
`AxionImportTime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`AxionImportGuid` varchar(36) NOT NULL,
`Channel` varchar(50) DEFAULT NULL COMMENT 'EMAIL, SMS, PUSH, WEBHOOK',
`RecipientCount` int(11) DEFAULT 0,
`DeliveredCount` int(11) DEFAULT 0,
`FailedCount` int(11) DEFAULT 0,
`ElapsedSeconds` decimal(10,3) DEFAULT NULL,
`QueueDepth` int(11) DEFAULT NULL,
`OperationSuccess` char(1) DEFAULT 'N',
`ErrorMessage` text DEFAULT NULL,
PRIMARY KEY (`NotifyCode`, `Package`, `AxionImportTime`, `AxionImportGuid`),
KEY `idx_channel` (`Channel`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE={collation};
get_collation()Minimize Overhead:
# GOOD: Single memory check at end
memory_mb = psutil.Process().memory_info().rss / (1024 * 1024)
# BAD: Multiple checks during operation
for item in items:
memory_mb = psutil.Process().memory_info().rss / (1024 * 1024) # Too frequent!
Batch Cleanup:
# GOOD: Periodic cleanup every 100 operations
if self.__class__._cleanup_counter % 100 == 0:
self.cleanup_old_metrics(days=90)
# BAD: Cleanup after every operation
self.cleanup_old_metrics(days=90) # Too expensive!
Async Metrics (Advanced):
# For high-frequency operations, consider async metrics
import asyncio
async def write_metrics_async(self, ...):
# Non-blocking metrics write
await asyncio.create_task(self._async_write(...))
Track the success of metrics implementation:
factory.core/ObjDataImport.pyfactory.core/ObjDataImport.yamlresource.test/pytests/factory.core/test_ObjDataImport_comprehensive.pyresource.test/benchmark_dataimport.pyFind slowest operations:
SELECT
<ModuleCode>,
OperationName,
AVG(ElapsedSeconds) as AvgTime,
AVG(ItemsPerSecond) as AvgRate,
COUNT(*) as Operations
FROM def_<module>_metric
WHERE AxionImportTime >= DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY <ModuleCode>, OperationName
ORDER BY AvgTime DESC
LIMIT 10;
Memory usage trends:
SELECT
DATE(AxionImportTime) as Date,
AVG(MemoryUsageMB) as AvgMemory,
MAX(MemoryUsageMB) as PeakMemory
FROM def_<module>_metric
WHERE AxionImportTime >= DATE_SUB(NOW(), INTERVAL 30 DAY)
GROUP BY DATE(AxionImportTime)
ORDER BY Date;
Error rate by operation:
SELECT
OperationName,
COUNT(*) as TotalOps,
SUM(CASE WHEN OperationSuccess = 'N' THEN 1 ELSE 0 END) as Failures,
ROUND(SUM(CASE WHEN OperationSuccess = 'N' THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) as ErrorRate
FROM def_<module>_metric
WHERE AxionImportTime >= DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY OperationName
HAVING ErrorRate > 1.0
ORDER BY ErrorRate DESC;
Document Version: 1.0
Last Updated: 2025-12-28
Next Review: 2026-03-28
Owner: Engineering Team