NOTICE: All information contained herein is, and remains
the property of TechnoCore Automate.
The intellectual and technical concepts contained
herein are proprietary to TechnoCore Automate and
dissemination of this information or reproduction of this material
is strictly forbidden unless prior written permission is obtained
from TechnoCore Automate.
The ObjWorkflow module is a powerful, data-driven engine responsible for orchestrating and executing complex business processes. It models these processes as Directed Acyclic Graphs (DAGs), where each step is a node and dependencies define the directed, acyclic flow.
The engine interprets workflow definitions stored in a database, manages the flow of data between steps, and integrates with various other modules to perform tasks such as calculations, external API calls, sending notifications, and generating documents. It is designed to be highly flexible, supporting both synchronous and asynchronous execution, conditional branching, and robust error handling. It serves as the backbone for automating processes initiated by user actions (e.g., form submissions), scheduled events, or API calls.
Workflow ClassThe primary component of the module is the Workflow class, which inherits from ObjApi, ObjRecurringMixin, and ObjVersionMixin.
__init__)When a Workflow object is instantiated, it performs the following key actions:
ObjApi class.kafka, payload, and User.CREATE_TABLE_CHECK). If the check has not been performed yet during the current execution, it calls the create_tables_from_yaml("ObjWorkflow") method. This method reads the resource.schema/ObjWorkflow.yaml file and creates the necessary database tables (def_workflow, def_workflows, etc.) if they do not already exist. This makes the module self-contained and simplifies deployment.Run(...)This is the main entry point for executing a workflow. It orchestrates the entire process, from setup to completion.
Parameters:
workflow_code (str): The unique identifier for the workflow to be executed.guid (str, optional): A unique ID for the specific instance of the workflow run. If not provided, one may be extracted from the context.context (dict, optional): A dictionary containing the initial data and state for the workflow. This context is passed through and modified by each node in the workflow.rerun (int, optional): A flag to indicate if this is a re-run of a previous workflow. Defaults to 0 (not a re-run).simul_guid (str, optional): A GUID for simulation purposes.force_async_run (bool, optional): A flag to force asynchronous execution.Execution Logic:
is_workflow_async().
workflow_queue_context) for a separate worker process to pick up.run_workflow().run_workflow(...)This is the internal engine that processes the sequence of nodes defined for a workflow.
Logic:
WORKFLOW_BUFFER.run_context dictionary, which holds the state and results as the workflow progresses.CALC, SERVICE, EMAIL).node_* handler function.run_context with its results, and returns a value.BranchSql or BranchDirect definitions and the result of the current node.DONE node is reached or an error occurs.track_workflow_result or MongoDB).prepare_run(...)This method sets up the necessary data before run_workflow is called. It queries the database for the workflow definition (the list of nodes and their properties from def_workflows) and determines the process_guid.
closure(...)A utility method used to analyze and validate a workflow definition. It walks through the nodes to ensure they are correctly linked and updates the definition with metadata, such as GUIDs and versioning.
The power of the workflow engine comes from its variety of node types. Each type corresponds to a specific action.
| Node Type | Description |
|---|---|
| API | Invokes a generic API call. |
| CALC | Executes a set of calculations defined in def_service_calculation. The results are merged into the context. |
| SERVICE | Runs a custom service class. The engine dynamically loads the service object and calls its run_workflow_direct method. |
| WEBHOOK | Triggers an external or internal webhook, passing the current context. |
| DECISION | Processes a decision table or rule set to determine an outcome. |
| SCORECARD | Computes a score based on a predefined scorecard model. |
| Stages an email into the MongoDB channel queue for async delivery via ServeConversation. | |
| SENDMAIL | Sends email directly via SMTP. Supports ObjTemplate email templates, AI summaries, and auto-generated detail sections. |
| SMS | Sends an SMS message. The recipient and message content are pulled from the context. |
| FEATURESTORE | Computes, validates, and manages feature store tables. Operations: COMPUTE, INCREMENTAL, VALIDATE, STATS, CREATE_TABLE. |
| FEATURERENDER | Renders feature store output into templates. Merges features, source data, and constants. Modes: SINGLE, BULK. |
| FORMFLOW | Transitions the user to a specific form in a user interface. |
| REPORTFLOW | Transitions the user to a specific report. |
| DOCUMENT | Performs operations on documents, such as generating a PDF from a template (DOCTEMPLATE) or extracting data (DOCEXTRACT). |
| EXPORT | Executes a data export process defined in def_dataexport. |
| IMPORT | Executes a data import process. |
| ACL | Performs Access Control List operations like user login, session validation, or password reset. |
| DATATRANSFER | Executes a data transfer operation between different systems or databases. |
| GATE | Generic gateway node for process modeling. Does not execute any logic, used for BPMN diagram compatibility. |
| GATE-EXCLUSIVE | Synonym for GATE. Exclusive gateway (XOR) for BPMN modeling. Does not execute logic - exclusive branching is implemented via BranchSql on all nodes. |
| GATE-PARALLEL | Parallel gateway (AND). Future: Kickstarts multiple workflows and waits for all to complete. Currently for modeling only. |
| GATE-INCLUSIVE | Inclusive gateway (OR). Future: Activates multiple branches based on conditions. Currently for modeling only. |
| DONE | Marks the successful completion of a workflow path. |
The workflow engine uses a Strategy Pattern implementation through the ObjNodeRegistry to dynamically dispatch node execution to appropriate handlers. This provides a plugin-style architecture that makes it easy to add new node types without modifying core workflow logic.
The ObjNodeRegistry is a self-registering dispatcher that maintains a mapping from node types to their executor classes.
Key Features:
WorkflowNodeType enum for node type constantsEach node executor class in factory.workflow/ registers itself with a decorator:
from ObjNodeRegistry import ObjNodeRegistry
from ObjEnum import WorkflowNodeType
@ObjNodeRegistry.register(WorkflowNodeType.API.value)
class ObjWorkflowApi(ObjWorkflowNode):
def execute(self, run_context, current_result, input_guid, node_type, name, **kwargs):
# Execute API node logic
return run_context, current_result
Multiple node types can share the same executor:
@ObjNodeRegistry.register(WorkflowNodeType.FORM, WorkflowNodeType.GUI)
class ObjWorkflowGui(ObjWorkflowNode):
# Handles both FORM and GUI nodes
pass
During workflow execution, the engine uses the registry to get the appropriate executor:
if ObjNodeRegistry.is_registered(node_type):
# Get executor instance for this node type
executor = ObjNodeRegistry.get_executor(node_type, self)
# Execute the node
proxied_executor = self._proxy_factory(executor)
run_context, current_result = proxied_executor.execute(
run_context, current_result, input_guid, node_type, name, **kwargs
)
All node executors inherit from ObjWorkflowNode:
class ObjWorkflowNode:
def __init__(self, workflow_instance):
self.workflow = workflow_instance
self.DB = workflow_instance.DB
def execute(self, run_context, current_result, input_guid, node_type, name, **kwargs):
raise NotImplementedError("Subclasses must implement execute()")
factory.workflow/ObjWorkflow*.py)ObjWorkflow.pyThe following node types are currently registered (31 total):
To add a new node type:
factory.core/ObjEnum.py:class WorkflowNodeType(StrEnum):
# ... existing types
MYNEWTYPE = "MYNEWTYPE"
factory.workflow/ObjWorkflowMyNewType.py:from ObjNodeRegistry import ObjNodeRegistry
from ObjWorkflowNode import ObjWorkflowNode
from ObjEnum import WorkflowNodeType
@ObjNodeRegistry.register(WorkflowNodeType.MYNEWTYPE.value)
class ObjWorkflowMyNewType(ObjWorkflowNode):
def execute(self, run_context, current_result, input_guid, node_type, name, **kwargs):
# Your logic here
return run_context, current_result
ObjWorkflow.py - the registry auto-discovers the new type!Gateway nodes are specialized node types designed for process modeling and BPMN diagram compatibility. They represent decision points and flow control in workflow diagrams but do not execute business logic themselves.
GATE (Generic Gateway)GATE-EXCLUSIVE (XOR Gateway)GATE-EXCLUSIVE is a synonym for GATE - both behave identicallyBranchSql mechanism.GATE-PARALLEL (AND Gateway)GATE-INCLUSIVE (OR Gateway)Gateway nodes follow a "model first, execute later" approach:
Gateway nodes are registered in the ObjNodeRegistry like all other node types:
# factory.workflow/ObjWorkflowGateParallel.py
@ObjNodeRegistry.register(WorkflowNodeType.GATE_PARALLEL.value)
class ObjWorkflowGateParallel(ObjWorkflowNode):
def execute(self, run_context, current_result, input_guid, node_type, name, **kwargs):
# Placeholder - future implementation for parallel split/join
return run_context, current_result
# factory.workflow/ObjWorkflowGateExclusive.py
@ObjNodeRegistry.register(
WorkflowNodeType.GATE_EXCLUSIVE.value,
WorkflowNodeType.GATE.value # GATE is an alias for GATE-EXCLUSIVE
)
class ObjWorkflowGateExclusive(ObjWorkflowNode):
def execute(self, run_context, current_result, input_guid, node_type, name, **kwargs):
# Pass through - exclusive branching handled by BranchSql
return run_context, current_result
# factory.workflow/ObjWorkflowGateInclusive.py
@ObjNodeRegistry.register(WorkflowNodeType.GATE_INCLUSIVE.value)
class ObjWorkflowGateInclusive(ObjWorkflowNode):
def execute(self, run_context, current_result, input_guid, node_type, name, **kwargs):
# Placeholder - future implementation for inclusive choice
return run_context, current_result
The module relies on a set of database tables to define and track workflows. These are automatically created on initialization if they don't exist.
def_workflow: The main definition of a workflow, including its name, package, and high-level properties like Async. Contains a Module column for organisational grouping.def_workflows: Defines the individual steps (nodes) within each workflow, including their type, rank (order), and branching logic (BranchSql, BranchDirect). Contains a Module column for organisational grouping.def_workflow_actors: Defines the roles or actors involved in a workflow.def_workflow_transitions: Logs the transitions between workflow nodes for tracking and auditing.The workflow system uses centralized constants and enums to ensure consistency and prevent magic values throughout the codebase.
Defined in factory.core/ObjConstants.py, the WorkflowConstants class contains workflow-specific constants:
class WorkflowConstants:
# Simulation
SIMULATION_LIVE_SUFFIX: str = "_LIVE" # Appended to GUID for live simulation runs
# Queue management
QUEUE_UNINITIALIZED: int = 0 # Indicates queue not yet initialized
# Workflow visualization lanes
LANE_GUI: str = "GUI" # Lane for GUI/form nodes in diagrams
LANE_PROCESS: str = "PROCESS" # Lane for business process nodes
LANE_SERVICE: str = "SERVICE" # Lane for service execution nodes
# Multiprocessing
MULTIPROCESS_LIMIT: int = 10 # Maximum parallel workflow processes
Usage Example:
from ObjConstants import WorkflowConstants
# Check if workflow is in simulation mode
if guid.endswith(WorkflowConstants.SIMULATION_LIVE_SUFFIX):
# Handle live simulation
pass
# Initialize queue
if self.queue == WorkflowConstants.QUEUE_UNINITIALIZED:
self.queue = self._initialize_queue()
Defined in factory.core/ObjEnum.py, provides type-safe node type constants:
class WorkflowNodeType(StrEnum):
# Core nodes
START = "START"
DONE = "DONE"
SLEEP = "SLEEP"
# Communication
EMAIL = "EMAIL"
SMS = "SMS"
NOTIFY = "NOTIFY"
# Data operations
CALC = "CALC"
SERVICE = "SERVICE"
DATAEXPORT = "DATAEXPORT"
DATAIMPORT = "DATAIMPORT"
DATATRANSFER = "DATATRANSFER"
# Documents
DOCUMENT = "DOCUMENT"
DOCTEMPLATE = "DOCTEMPLATE"
DOCEXTRACT = "DOCEXTRACT"
DOCMERGE = "DOCMERGE"
DOCARCHIVE = "DOCARCHIVE"
# UI flow
FORM = "FORM"
GUI = "GUI"
FORMFLOW = "FORMFLOW"
FORMGUI = "FORMGUI"
REPORTFLOW = "REPORTFLOW"
REPORTGUI = "REPORTGUI"
# Integration
API = "API"
WEBHOOK = "WEBHOOK"
SCHEDULER = "SCHEDULER"
# Processing
DECISION = "DECISION"
SCORECARD = "SCORECARD"
PROFILE = "PROFILE"
FEATURESTORE = "FEATURESTORE"
# Security
ACL = "ACL"
# Gateways
GATE = "GATE"
GATE_EXCLUSIVE = "GATE-EXCLUSIVE"
GATE_PARALLEL = "GATE-PARALLEL"
GATE_INCLUSIVE = "GATE-INCLUSIVE"
GATE_FAN_OUT = "GATE-FAN-OUT"
GATE_FAN_IN = "GATE-FAN-IN"
GATE_GATHER = "GATE-GATHER"
Benefits:
"CALC" vs "CACL"Usage Example:
from ObjEnum import WorkflowNodeType
# Type-safe node type check
if node_type == WorkflowNodeType.CALC.value:
# Execute calculation
pass
# Register node executor
@ObjNodeRegistry.register(WorkflowNodeType.API.value)
class ObjWorkflowApi(ObjWorkflowNode):
pass
The EXTRACT_CONTEXT_PREFIXES constant defines prefixes for extracting parameters from workflow context:
EXTRACT_CONTEXT_PREFIXES: list[str] = [
"_form_", # Form field values
"_calc_", # Calculation results
"_api_", # API response data
"_sys_", # System parameters
"_service_", # Service execution results
"sys_", # Alternative system prefix
"", # Catch-all for non-prefixed parameters
]
These prefixes are used throughout the workflow system to identify and extract specific types of context variables.
The module includes a CLI for administrative and testing purposes, built with typer.
runExecutes a workflow from the command line.
Usage:
python3 factory.core/ObjWorkflow.py run <WORKFLOW_CODE> [OPTIONS]
Example:
# Run the 'USER_REGISTRATION' workflow with a specific GUID
python3 factory.core/ObjWorkflow.py run USER_REGISTRATION --guid "USER12345"
closureRuns the closure and validation logic on a workflow definition, then exports a Piper flow file for visualization. Delegates to ObjWorkflowEdit.closure() and ObjWorkflowVisual.export_to_piper().
Usage:
python3 factory.core/ObjWorkflow.py closure <WORKFLOW_CODE>
preflightPerforms a pre-flight check on the environment and workflow configurations.
Usage:
python3 factory.core/ObjWorkflow.py preflight
reviewGenerates an AI-powered workflow review for documentation.
Usage:
python3 factory.core/ObjWorkflow.py review <WORKFLOW_CODE> [--model MODEL]
directRuns a workflow directly with a given GUID, calling closure first.
Usage:
python3 factory.core/ObjWorkflow.py direct <WORKFLOW_CODE> <GUID>
resumeResumes a PAUSED workflow execution from stage_workflow by GUID.
Usage:
python3 factory.core/ObjWorkflow.py resume <GUID>
The Workflow class inherits ObjRecurringMixin (from factory.core/extend.delegate/) with _recurring_module_type = "WORKFLOW". This provides cron-based scheduling via the shared def_recurring table.
When a recurring workflow fires:
recurring_check_due() finds all rows in def_recurring where ModuleType = "WORKFLOW" and NextRunDate <= NOW()_recurring_execute(row) parses the Payload column as JSON context, generates a guid, and calls run_direct(workflow_name, guid, context)NextRunDate is advanced to the next cron occurrence and RunCounter is incremented| Method | Description |
|---|---|
recurring_create(name, cron_expression, package, project, payload) |
Schedule a workflow to run on a cron expression. name is the workflow code, payload is optional JSON context. |
recurring_check_due() |
Execute all due recurring workflows and advance their schedules. |
recurring_list(package) |
List all recurring workflow schedules for a package. |
recurring_cancel(guid) |
Deactivate a recurring schedule. |
wf = Workflow()
wf.recurring_create(
name="HCCOLLECTIONS",
cron_expression="0 6 * * 1-5",
package="HOMECHOICE",
payload='{"param1": "BATCH"}',
)
See ObjRecurringMixin.md for the shared table schema and full mixin documentation.
The workflow system is composed of several specialized modules. Visualization and CRUD operations have been refactored out of ObjWorkflow into dedicated modules:
| Module | Location | Purpose |
|---|---|---|
| ObjWorkflowVisual | factory.core/extend.visual/ |
Workflow diagram generation and visualization. Contains visualise(), visualise_piper(), and export_to_piper() methods for rendering workflow DAGs as Piper flow diagrams. |
| ObjWorkflowEdit | factory.core/extend.edit/ |
CRUD operations for workflow definitions. Provides closure() for validating and updating workflow node linkage, and load/save operations for def_workflow and def_workflows tables. |
| ObjNodeRegistry | factory.core/ |
Node executor strategy registry. Self-registering dispatcher that maps WorkflowNodeType values to their executor classes. Node executors in factory.workflow/ register themselves via @ObjNodeRegistry.register() decorators at import time. |
| ObjWorkflowNode | factory.workflow/ |
Base class for all node executors. Defines the execute() interface that every node type must implement. |
| ObjWorkflowSimul | factory.core/ |
Simulation orchestration tool for batch-running workflows under test conditions. Not a workflow node and not registered in ObjNodeRegistry. |
| ObjRecurringMixin | factory.core/extend.delegate/ |
Generic cron-based recurring schedule mixin. Workflow uses it with ModuleType = "WORKFLOW" against the shared def_recurring table. |
| ObjVersionMixin | factory.core/ |
Universal version history tracking. Registered with main_table: def_workflow, nodes_table: def_workflows. |