File: factory.core/ObjWorkflow.py
Lines: 2,358
Version: 8.0
Analysis Date: 2025-12-14
ObjWorkflow.py is a critical component managing workflow orchestration in Axion. While functionally complete, the codebase exhibits several anti-patterns and areas for improvement including excessive global variable usage, inconsistent exception handling, and opportunities for refactoring to improve maintainability.
Priority Levels: 🔴 High | 🟡 Medium | 🟢 Low
The module uses multiple global dictionaries and flags:
WORKFLOW_BUFFER = dict()
IS_WORKFLOW_ASYNC = dict()
IS_WORKFLOW_GUI = dict()
WORKFLOW_BUFFER_OBJ = dict()
CREATE_TABLE_CHECK = None
Lines: 92-97, 115, 261, 274, 331
Option 1: Move to Instance Variables
class Workflow(ObjApi.ObjApi):
def __init__(self, DB=0, Page=0):
super().__init__(DB)
self._workflow_buffer = {}
self._is_workflow_async = {}
self._is_workflow_gui = {}
self._workflow_buffer_obj = {}
self._create_table_check = False
Option 2: Use Class Variables with Locks
import threading
class Workflow(ObjApi.ObjApi):
_workflow_buffer_lock = threading.RLock()
_workflow_buffer = {}
@classmethod
def get_workflow_buffer(cls, key):
with cls._workflow_buffer_lock:
return cls._workflow_buffer.get(key)
Option 3: Use External State Manager
# factory.core/ObjWorkflowState.py
from dataclasses import dataclass, field
from typing import Dict
import threading
@dataclass
class WorkflowState:
buffer: Dict = field(default_factory=dict)
async_flags: Dict = field(default_factory=dict)
gui_flags: Dict = field(default_factory=dict)
_lock: threading.RLock = field(default_factory=threading.RLock)
def set_workflow_async(self, workflow_code: str, is_async: bool):
with self._lock:
self.async_flags[workflow_code] = is_async
Lines: 71, 501, 557, 561, 565, 1116
# Line 71 - Silent failure during import
except BaseException as e:
print(f"ObjServiceClassification not implemented {e}")
pass
Issue: Catches system exceptions (KeyboardInterrupt, SystemExit) which should propagate.
Fix:
try:
import ObjServiceClassification
except (ImportError, ModuleNotFoundError) as e:
self.error(f"ObjServiceClassification not available: {e}")
ObjServiceClassification = None
Line: 1279 (commented out but shows pattern)
# except:
Fix: Always specify exception types and log appropriately.
Lines: 501, 557, 561, 565
except BaseException:
pass # Silent failure
Fix:
except (ValueError, KeyError) as e:
self.debug(f"Failed to process node: {e}")
# Handle gracefully or re-raise
BaseException with specific typesfrom clause to preserve stack traceexcept ValueError as e:
self.error(f"Invalid value in workflow {workflow_code}")
raise WorkflowError("Workflow validation failed") from e
The run_workflow() method (line 574) contains 25+ nested functions (node_api, node_sleep, node_scheduler, etc.).
Problems:
Recommendation: Extract to separate methods or strategy pattern
class NodeHandler:
"""Base class for workflow node handlers"""
def execute(self, run_context: dict, current_result: str,
input_guid: str, node_type: str, name: str) -> tuple:
raise NotImplementedError
class ApiNodeHandler(NodeHandler):
def execute(self, run_context, current_result, input_guid, node_type, name):
# Implementation from node_api
...
class WorkflowNodeRegistry:
"""Registry pattern for node handlers"""
def __init__(self):
self._handlers = {
'API': ApiNodeHandler(),
'SLEEP': SleepNodeHandler(),
'SCHEDULER': SchedulerNodeHandler(),
# ... etc
}
def get_handler(self, node_type: str) -> NodeHandler:
return self._handlers.get(node_type)
Lines: 97, 1466
WORKFLOW_KICKOFF = "'GUI','FORM','IMPORT','API','WEBHOOK','SCHEDULER','REFLOW','ALERT','EVENT','SERVICE','START'"
Recommendation: Use Enum
from enum import Enum
class WorkflowKickoffType(Enum):
GUI = "GUI"
FORM = "FORM"
IMPORT = "IMPORT"
API = "API"
WEBHOOK = "WEBHOOK"
SCHEDULER = "SCHEDULER"
REFLOW = "REFLOW"
ALERT = "ALERT"
EVENT = "EVENT"
SERVICE = "SERVICE"
START = "START"
@classmethod
def all_values(cls):
return [e.value for e in cls]
@classmethod
def sql_list(cls):
return ",".join(f"'{e.value}'" for e in cls)
Usage:
sql = f"WHERE type IN ({WorkflowKickoffType.sql_list()})"
Partial type hints exist but inconsistently applied.
Lines with hints: 135, 193, 233, 345, 472, 574
Lines without hints: Most method parameters and returns
from typing import Dict, List, Tuple, Optional, Any, Union
def prepare_run(
self,
workflow_code: str,
guid: str,
context: Dict[str, Any],
package: str,
archetype: str
) -> Tuple[str, List[Dict], str, str, str]:
...
from typing import TypedDict
class WorkflowNode(TypedDict):
name: str
type: str
input_guid: str
up: str
down: str
param1: str
param2: str
param3: str
class WorkflowContext(TypedDict, total=False):
guid: str
processguid: str
param1: str
param2: str
param3: str
result: Dict[str, Any]
_sys_user: str
mypy factory.core/ObjWorkflow.py --strict
Line 362:
def Run(self, ...):
gc.disable() # Never re-enabled!
Problem: Disabling GC can lead to memory leaks if not re-enabled.
Fix:
def Run(self, ...):
gc_was_enabled = gc.isenabled()
try:
gc.disable()
# ... workflow execution ...
finally:
if gc_was_enabled:
gc.enable()
Lines 365-366:
if DO_DEBUG:
for n in range(1, 10):
self.add_column("def_workflow", f"Result{n}Map", "char(255)")
Problem: Schema modifications in every Run() call during debug mode.
Fix: Move to initialization or migration script.
Lines 176-189: Regex pattern compilation inside method (called frequently)
Fix:
# Class-level compiled pattern
class Workflow(ObjApi.ObjApi):
_PARAM_PATTERN = re.compile(
r'\$(guid|processguid|param[1-3]|sys_param[1-7]|[a-zA-Z0-9_]+)\$'
r'|~~([a-zA-Z0-9_]+)~~'
)
def local_patch_param(self, ...):
text = self._PARAM_PATTERN.sub(replacer, text)
Lines 395-406: String interpolation in MongoDB query
query = f"""DB.workflow_result.aggregate([
{{
"$match": {{
"param1": "{context["param1"]}", # ⚠️ Injection risk
"value": "form:{context["_sys_form"]}" # ⚠️ Injection risk
}}
}}
])
"""
Fix: Use parameterized queries
query = {
"$match": {
"param1": context["param1"],
"value": f"form:{context['_sys_form']}"
}
}
result = self.mongo_aggregate("workflow_result", [query])
Context dictionaries from user input used directly without validation.
Recommendation: Add input validation layer
from pydantic import BaseModel, validator
class WorkflowContext(BaseModel):
guid: str
param1: Optional[str]
param2: Optional[str]
param3: Optional[str]
@validator('guid')
def validate_guid(cls, v):
if not re.match(r'^[a-zA-Z0-9_-]+$', v):
raise ValueError('Invalid GUID format')
return v
Many methods lack comprehensive docstrings with parameter descriptions, return types, and examples.
Current (line 135):
def local_patch_param(self, context: dict, guid: str, processguid: str, text: str = "") -> str:
"""
Replaces placeholders in a text string...
"""
Improved:
def local_patch_param(
self,
context: dict,
guid: str,
processguid: str,
text: str = ""
) -> str:
"""
Replaces placeholders in text with values from context.
Supports two placeholder formats:
- $key$ - Single dollar signs
- ~~key~~ - Double tildes
Args:
context: Dictionary containing replacement values
guid: Workflow instance GUID
processguid: Process-specific GUID (defaults to guid if empty)
text: Text string containing placeholders
Returns:
Text with all placeholders replaced
Examples:
>>> ctx = {"result": {"status": "OK"}, "param1": "test"}
>>> workflow.local_patch_param(ctx, "abc123", "", "Status: $status$")
"Status: OK"
See Also:
- process_text(): Further text processing after replacement
"""
Add comprehensive module-level documentation:
"""
ObjWorkflow - Workflow Orchestration Engine
This module provides the core workflow execution engine for Axion, implementing
a Directed Acyclic Graph (DAG) based workflow system with support for:
- Synchronous and asynchronous execution
- Multiple node types (API, GUI, Service, Calculation, etc.)
- Workflow chaining and nesting
- Queue-based execution via RabbitMQ
- MongoDB-based state persistence
- Context-aware parameter substitution
Architecture:
Workflows are modeled as DAGs where:
- Nodes represent workflow steps (API calls, calculations, decisions, etc.)
- Edges define execution dependencies
- Context carries state between nodes
- Results are aggregated and stored
Key Classes:
- Workflow: Main orchestration class
- WorkflowNode: Individual step handler (future refactoring)
- WorkflowContext: Execution state container (future refactoring)
Usage:
>>> workflow = Workflow()
>>> result = workflow.Run("CUSTOMER_ONBOARDING", guid="cust_001", context={})
See Also:
- ObjWorkflow.md: Detailed documentation
- factory.core/ObjWorkflow.schema: Database schema
- ServeWorkflow.py: Service entry point
"""
All node functions follow similar patterns:
def node_X(run_context, current_result, input_guid, node_type, name):
try:
# Setup
# Execute
# Update context
return result, ""
except Exception as e:
self.error(...)
return "", f"error:{e}"
Recommendation: Template Method Pattern
class NodeHandlerBase:
"""Base class with common node execution logic"""
def execute(self, run_context, current_result, input_guid, node_type, name):
"""Template method defining execution flow"""
try:
self.pre_execute(run_context)
result = self.do_execute(run_context, current_result, input_guid, name)
self.post_execute(run_context, result)
return result, ""
except Exception as e:
return self.handle_error(e, name)
def pre_execute(self, run_context):
"""Hook for setup (override if needed)"""
pass
def do_execute(self, run_context, current_result, input_guid, name):
"""Main execution logic (must override)"""
raise NotImplementedError
def post_execute(self, run_context, result):
"""Hook for cleanup (override if needed)"""
pass
def handle_error(self, exception, name):
"""Standard error handling"""
self.error(f"Node {name} failed: {exception}")
return "", f"error:{exception}"
No test file found for ObjWorkflow.
Recommendation: Create comprehensive test suite
# resource.test/pytests/factory.core/test_objworkflow.py
import pytest
from unittest.mock import Mock, patch
from ObjWorkflow import Workflow
class TestWorkflow:
@pytest.fixture
def workflow(self):
with patch('ObjWorkflow.ObjApi'):
return Workflow()
def test_local_patch_param_guid_substitution(self, workflow):
context = {}
result = workflow.local_patch_param(
context, "guid123", "proc456", "ID: $guid$"
)
assert result == "ID: guid123"
def test_local_patch_param_context_result(self, workflow):
context = {"result": {"status": "OK"}}
result = workflow.local_patch_param(
context, "", "", "Status: ~~status~~"
)
assert result == "Status: OK"
@pytest.mark.integration
def test_run_workflow_end_to_end(self, workflow):
# Integration test with real database
...
Debug flags scattered throughout:
DO_DEBUG = False
DO_AVRO = False
DO_STEP = False
DO_SMSSEND = False
DO_TRACKING = True
Lines: 16-26
Centralize configuration:
from dataclasses import dataclass
@dataclass
class WorkflowConfig:
debug: bool = False
enable_avro: bool = False
enable_step: bool = False
enable_sms: bool = False
enable_tracking: bool = True
multiprocess_limit: int = 10
@classmethod
def from_env(cls):
"""Load from environment variables"""
import os
return cls(
debug=os.getenv('WORKFLOW_DEBUG', 'false').lower() == 'true',
enable_avro=os.getenv('WORKFLOW_AVRO', 'false').lower() == 'true',
# ...
)
@classmethod
def from_config_yaml(cls, config_dict):
"""Load from config.yaml"""
return cls(**config_dict.get('workflow', {}))
class Workflow(ObjApi.ObjApi):
def __init__(self, DB=0, Page=0, config: WorkflowConfig = None):
super().__init__(DB)
self.config = config or WorkflowConfig()
Mix of print statements and logging methods:
print(f"ObjServiceClassification not implemented {e}") # Line 72
self.info(f"Workflow {workflow_code} started...") # Line 361
self.debug("Run guid ", guid) # Line 387
self.error(f"Exception: {E}") # Line 213
import logging
import structlog
class Workflow(ObjApi.ObjApi):
def __init__(self, ...):
super().__init__(...)
self.logger = structlog.get_logger(__name__)
def Run(self, workflow_code, guid="", ...):
self.logger.info(
"workflow_started",
workflow_code=workflow_code,
guid=guid,
async_mode=self.is_workflow_async(workflow_code)
)
Before:
try:
name, description = self.sql_get_values(sql)
except Exception as E:
self.error(f"Exception: {E}")
name = ""
After:
try:
name, description = self.sql_get_values(sql)
except (DatabaseError, ValueError) as e:
self.logger.error(
"Failed to fetch workflow basename",
webhook_code=webhook_code,
error=str(e),
exc_info=True
)
name = ""
description = ""
Before:
WORKFLOW_BUFFER = dict()
class Workflow:
def get_connected_workflows(self, ...):
global WORKFLOW_BUFFER
WORKFLOW_BUFFER.setdefault(workflow_code, {}).update({...})
After:
class Workflow:
def __init__(self):
self._workflow_buffer = {}
self._buffer_lock = threading.RLock()
def get_connected_workflows(self, ...):
with self._buffer_lock:
self._workflow_buffer.setdefault(workflow_code, {}).update({...})
| Metric | Current | Target | Priority |
|---|---|---|---|
| Lines of Code | 2,358 | <1,500 | 🟡 |
| Cyclomatic Complexity | High | <10 per method | 🔴 |
| Test Coverage | 0% | >80% | 🔴 |
| Type Hint Coverage | ~20% | >90% | 🟡 |
| Global Variables | 6 | 0 | 🔴 |
| Exception Handling | Poor | Good | 🔴 |
| Largest Method | ~1000 lines | <100 lines | 🔴 |
Estimated Technical Debt: ~4-6 weeks of refactoring work
Separate Concerns: Split into multiple modules
ObjWorkflowExecutor - Execution engineObjWorkflowNode - Node handlersObjWorkflowContext - Context managementObjWorkflowQueue - Queue managementEvent-Driven Architecture: Use events for workflow state changes
class WorkflowEventBus:
def emit(self, event: WorkflowEvent):
for listener in self._listeners:
listener.handle(event)
Plugin System: Allow custom node types
class NodePluginRegistry:
def register(self, node_type: str, handler: NodeHandler):
self._handlers[node_type] = handler
Workflow DSL: Consider YAML/JSON workflow definitions
workflow:
name: CUSTOMER_ONBOARDING
nodes:
- id: validate_input
type: service
service: InputValidator
- id: create_account
type: api
depends_on: [validate_input]
ObjWorkflow.py is a functionally complete but architecturally challenged codebase. The primary issues are:
Implementing the recommended changes will significantly improve code quality, maintainability, and reliability while preserving existing functionality.
Next Steps: