Date: 2025-12-14
Target: factory.core/ObjWorkflow.py
Current State: Functional but has significant performance bottlenecks
Current Issue:
# INSIDE the workflow execution loop - runs for EVERY node!
node_dispatch = {
"API": ObjWorkflowApi(self).execute,
"SLEEP": ObjWorkflowSleep(self).execute,
# ... 27 node classes instantiated EVERY iteration
}
Impact:
__init__(), sets up references, allocates memorySolution:
Move node_dispatch to __init__() as an instance variable:
class Workflow(ObjData):
def __init__(self, DB=None):
super().__init__(DB)
# ... existing init code ...
# Initialize node dispatch ONCE
self._node_dispatch = {
"API": ObjWorkflowApi(self).execute,
"SLEEP": ObjWorkflowSleep(self).execute,
"SCHEDULER": ObjWorkflowScheduler(self).execute,
# ... all 27 nodes
}
Then in run_workflow():
# Just reference it - no instantiation
if node_up in self._node_dispatch:
run_context, current_result = self._node_dispatch[node_up](*args, **kwargs)
Expected Speedup: 30-50% for typical workflows
Current Issue (Line 208):
def local_patch_param(self, context: dict, guid: str, processguid: str, text: str = "") -> str:
replacements = {}
# Builds replacement dict every time
for key, value in context["result"].items():
replacements[f"~~{key.lower()}~~"] = str(value)
replacements[f"${key.lower()}$"] = str(value)
# ... then does sequential string replace
Impact:
str.replace() calls inefficient for many replacementsSolution:
Use regex-based batch replacement:
import re
def local_patch_param(self, context: dict, guid: str, processguid: str, text: str = "") -> str:
if not text:
return text
# Build replacements dict (consider caching for repeated contexts)
replacements = {
"$guid$": guid,
"$processguid$": processguid if processguid else guid,
}
if "result" in context and isinstance(context["result"], dict):
for key, value in context["result"].items():
key_lower = key.lower()
replacements[f"~~{key_lower}~~"] = str(value)
replacements[f"${key_lower}$"] = str(value)
# ... add other replacements ...
# Single-pass regex replacement (much faster)
pattern = re.compile("|".join(re.escape(k) for k in replacements.keys()))
return pattern.sub(lambda m: replacements[m.group(0)], text)
Expected Speedup: 15-25% for workflows with heavy parameter substitution
Current Issues:
Solutions:
A. Use dict views instead of copies where possible:
# Instead of copying entire context
new_context = copy.copy(context)
# Use context directly when not modifying
# Only copy when actually needed
if needs_modification:
new_context = context.copy()
else:
new_context = context # Just reference
B. Lazy context building:
class LazyContext:
"""Builds context dict only when accessed"""
def __init__(self, base_context):
self._base = base_context
self._cache = None
def get(self):
if self._cache is None:
self._cache = self._build_context()
return self._cache
Expected Speedup: 5-10% for large workflows
Current Issue:
self.debug("Node type ", node_type) # Executes every node
self.debug(f"Run a node {node_up}") # String formatting even when disabled
Impact:
DO_DEBUG = FalseSolution:
# Use conditional logging
if DO_DEBUG:
self.debug(f"Node type {node_type}")
self.debug(f"Run a node {node_up}")
# Or implement lazy logging
def debug(self, msg, *args):
if DO_DEBUG:
if callable(msg):
msg = msg() # Only evaluate if debugging
self._log(msg, *args)
# Usage:
self.debug(lambda: f"Expensive operation: {expensive_call()}")
Expected Speedup: 3-8% when debug disabled
Current Patterns:
# Line 566-571: Individual queries in prepare_run
sql = self.get_queries("prepare_run_get_workflow_and_rank").format(...)
workflow, rank = self.sql_get_values(sql)
sql = self.get_queries("prepare_run_get_processguidsql").format(...)
process_guid_sql = self.sql_get_value(sql)
sql = self.get_queries("prepare_run_get_nodes").format(...)
nodes = self.sql_get_values(sql)
Solution:
Combine into single query with JOIN:
-- Single query returns all needed data
SELECT
w.WorkflowName,
w.Rank,
w.ProcessGuidSql,
n.WorkflowGuid,
n.Rank,
n.Type,
-- ... all node fields
FROM def_workflows w
LEFT JOIN def_workflow n ON n.WorkflowName = w.WorkflowName
WHERE w.WorkflowName = '{workflow_code}'
ORDER BY n.Rank
Expected Speedup: 10-20% for workflow initialization
Current Issue:
workflow_buffer caches workflow nodes (good!)Solution:
# Enhanced caching in workflow_buffer
self._workflow_buffer[workflow_code] = {
'process_guid': process_guid,
'nodes': nodes,
'package': package,
'archetype': archetype,
'node_set': node_set, # Pre-built node_set dict
'branch_patterns': {}, # Compiled regex patterns for branch SQL
'parsed_node_data': {} # Pre-parsed JSON node_data
}
Expected Speedup: 5-10% for repeated workflow executions
Current Issue (Lines 708-713):
# Rebuilds node_set dict every time workflow runs
for node in nodes:
(WorkflowGuid, rank, node_type, ...) = node
node_set[WorkflowGuid.lower()] = (rank, node_type, ...)
Solution:
Build node_set once in prepare_run(), store in workflow_buffer:
def prepare_run(self, workflow_code, guid, context, package, archetype):
# ... existing code ...
# Build node_set once
node_set = {}
for node in nodes:
(WorkflowGuid, rank, node_type, node_async, name, ...) = node
node_set[WorkflowGuid.lower()] = {
'rank': rank,
'node_type': node_type,
'node_async': node_async,
'name': name,
# ... etc
}
self._workflow_buffer[workflow_code]['node_set'] = node_set
Expected Speedup: 3-5% per workflow execution
Replace:
str(name) + "_" + str(value)
With:
f"{name}_{value}" # Slightly faster
Current:
for k in context.keys():
if v is not None and isinstance(v, str):
# ...
Better:
for k, v in context.items(): # Single iteration
if v is not None and isinstance(v, str):
# ...
.upper() CallsCurrent:
node_up = node_type.upper()
if node_up in ["NULL", "TODO", "GATE"]:
# ...
elif node_up in node_dispatch:
# ...
Already optimal - but ensure not calling .upper() multiple times elsewhere
__slots__ for Frequently Created ObjectsIf any small objects are created in loops, define __slots__:
class NodeContext:
__slots__ = ['guid', 'name', 'type', 'data']
def __init__(self, guid, name, type, data):
self.guid = guid
self.name = name
self.type = type
self.data = data
Saves ~40% memory per instance.
Current: Sequential execution only
Future Enhancement:
import asyncio
async def execute_parallel_nodes(self, nodes, run_context):
tasks = [
self.execute_node_async(node, run_context)
for node in nodes
]
results = await asyncio.gather(*tasks)
return self.merge_results(results)
Expected Speedup: 2-10x for workflows with parallelizable nodes
node_dispatch to __init__() ← CRITICALnode_set in prepare_run()Expected Total: 40-60% speedup
local_patch_param() with regexprepare_run()Expected Total: Additional 20-30% speedup
Expected Total: Additional 50-100% speedup for appropriate workflows
-- Create test workflow with 20 nodes
INSERT INTO def_workflows VALUES ('PERF_TEST', 'core', ...);
-- Mix of node types
INSERT INTO def_workflow VALUES
('PERF_TEST', 'core', 10, 'SERVICE', 'node1', ...),
('PERF_TEST', 'core', 20, 'CALC', 'node2', ...),
-- ... 18 more nodes
Strengths:
Areas for Improvement:
# Profile a workflow execution
python3 -m cProfile -o workflow.prof ServeWorkflow.py
# Analyze results
python3 -m pstats workflow.prof
> sort cumtime
> stats 20
# Line-by-line profiling
kernprof -l -v factory.core/ObjWorkflow.py
Total Expected Speedup: 60-80% with Phase 1+2 optimizations
Critical Fix: Move node_dispatch to init() (30-50% gain alone)
Low-Hanging Fruit: Debug conditionals, node_set pre-computation
Future: Async parallel execution for 2-10x gains on parallelizable workflows
Recommendation: Start with Phase 1 (move node_dispatch) - this single change will provide the most significant improvement with minimal risk.