NOTICE: All information contained herein is, and remains
the property of TechnoCore.
The intellectual and technical concepts contained
herein are proprietary to TechnoCore and dissemination of this information or reproduction of this material
is strictly forbidden unless prior written permission is obtained
from TechnoCore.
ObjWorkflowNode is the base class for all workflow node type implementations. It provides common functionality, helper methods, and a consistent interface that all workflow nodes inherit from.
This architecture separates node-specific logic into individual, maintainable modules while sharing common patterns through inheritance.
class ObjWorkflowNode:
def __init__(self, workflow_instance):
"""
Args:
workflow_instance: Reference to parent Workflow object
"""
self.workflow = workflow_instance
self._Starttime = 0.0
self._Endtime = 0.0
All node classes receive a reference to the parent Workflow instance, providing access to:
| Property | Type | Description |
|---|---|---|
DB |
Database | Database connection from workflow instance |
package |
str | Package name (e.g., 'homechoice', 'core') |
archetype |
str | Archetype identifier |
All logging is delegated to the workflow instance:
def debug(self, *args, **kwargs)
def error(self, *args, **kwargs)
def note(self, *args, **kwargs)
def info(self, *args, **kwargs)
def get_queries(self, query_name: str) -> str
# Get SQL query template from YAML configuration
def sql_get_value(self, sql: str) -> str
# Execute SQL and return single value
def sql_get_values(self, sql: str) -> tuple
# Execute SQL and return multiple values
def get_deployment(self) -> str
# Returns: 'PROD', 'UAT', 'DEV', 'SIM'
def get_uuid(self, prefix: str = "") -> str
# Generate unique identifier
def local_patch_param(context: dict, guid: str, processguid: str, text: str) -> str
# Replace $placeholders$ in text with context values
def start_timer(self, node_name: str):
# Start timing node execution
def end_timer(self) -> float:
# End timing and return elapsed seconds
Usage:
self.start_timer("MyService")
# ... perform work ...
elapsed = self.end_timer() # Returns: 0.1234 (seconds)
def update_result_context(
run_context: dict,
result_key: str,
result_value: Any
) -> dict:
# Helper to update run_context['result'] safely
Usage:
run_context = self.update_result_context(
run_context,
"_service_payment",
"SUCCESS"
)
def set_service_attributes_from_context(
service_obj: Any,
run_context: dict,
prefixes: list = None
):
# Copy context values as attributes on service objects
This common pattern transfers context data to service objects. Three sources
are merged onto service_obj:
run_context['data'] — each key k is set with every prefix:_form_k, _calc_k, _api_k, _sys_k, _service_k.run_context['result'] — each top-level key is set directly. Whendict (e.g. a SERVICE node result stored under_service_virtual), its nested keys are also flattened onto service_obj$call_priority$, $age$ etc. resolve correctly in subsequentrun_context keys starting with _ or sys_ (stringUsage:
service = SomeService(self.DB)
self.set_service_attributes_from_context(service, run_context)
# After a SERVICE node result {'age': 45, 'income': 75000.0, ...}:
# service.age = "45"
# service.income = "75000.0"
# service._service_virtual = "{'age': 45, ...}" # original nested entry
# service._calc_total = "1000" # from run_context['data']
To create a new workflow node type:
# factory.core/ObjWorkflowMyNode.py
from ObjWorkflowNode import ObjWorkflowNode
class ObjWorkflowMyNode(ObjWorkflowNode):
def execute(
self,
run_context: dict,
current_result: str,
input_guid: str,
node_type: str,
name: str
) -> tuple:
"""
Execute MyNode logic.
Returns:
Tuple of (run_context, current_result)
"""
self.start_timer(name)
# Your node logic here
self.debug(f"Executing MyNode: {name}")
# Example: Update result
run_context = self.update_result_context(
run_context,
f"_mynode_{name.lower()}",
"PROCESSED"
)
elapsed = self.end_timer()
return run_context, "SUCCESS"
Create ObjWorkflowMyNode.md with node-specific documentation.
In ObjWorkflow.py:
from ObjWorkflowMyNode import ObjWorkflowMyNode
# In run_workflow method:
node_dispatch = {
...
"MYNODE": ObjWorkflowMyNode(self).execute,
...
}
All node execute() methods follow this signature:
def execute(
self,
run_context: dict, # Workflow execution context
current_result: str, # Result from previous node
input_guid: str, # GUID for input data
node_type: str, # Node type (CALC, SERVICE, etc.)
name: str # Node name from workflow definition
) -> Tuple[dict, str]: # Returns (updated_context, new_result)
def execute(self, ...):
self.start_timer(name)
# ... work ...
self.end_timer()
# Good
run_context = self.update_result_context(run_context, key, value)
# Avoid
run_context["result"][key] = value # May fail if "result" doesn't exist
# Good
self.debug("Processing...")
self.error("Failed!")
# Avoid
print("Processing...") # Doesn't use workflow logging
def execute(self, ...):
try:
# ... work ...
return run_context, "SUCCESS"
except Exception as e:
self.error(f"Node {name} failed: {e}")
return run_context, "FAIL"
Version: 8.0
Last Updated: 2026-03-01