This guide provides practical instructions for creating, configuring, and executing workflows using the ObjWorkflow module - the backbone of business process automation in the Axion system.
ObjWorkflow is a powerful, data-driven engine that orchestrates complex business processes as Directed Acyclic Graphs (DAGs). Each workflow consists of nodes (steps) connected by transitions, with data flowing through a context dictionary that gets enriched at each step.
Common Use Cases:
data.configsource dev-env/bin/activate)from ObjWorkflow import Workflow
# Initialize workflow engine
wf = Workflow(0)
# Execute a workflow with context data
context = {
"user_email": "john@example.com",
"user_name": "John Doe",
"registration_date": "2025-12-26"
}
result = wf.Run(
workflow_code="USER_REGISTRATION",
guid="USER12345",
context=context
)
print(f"Workflow completed: {result}")
# Check if workflow runs asynchronously
is_async = wf.is_workflow_async("USER_REGISTRATION")
print(f"Async workflow: {is_async}")
# Check if workflow has GUI elements
has_gui = wf.is_workflow_gui("USER_REGISTRATION")
print(f"Has GUI: {has_gui}")
A workflow consists of:
def_workflow table) - Metadata about the workflowdef_workflows table) - Individual steps in the processThe context is the heart of a workflow - it stores all data as the workflow executes:
context = {
# Input data
"customer_id": "CUST001",
"loan_amount": 50000,
# Added by workflow nodes
"credit_score": 720, # Added by CALC node
"approval_status": "approved", # Added by DECISION node
"notification_sent": True # Added by EMAIL node
}
Insert a record into def_workflow:
INSERT INTO def_workflow (
Name,
Package,
Workflow,
Async,
Description
) VALUES (
'LOAN_APPLICATION',
'factory.core',
'loan_application',
'N', -- Synchronous workflow
'Processes loan applications from submission to approval'
);
Insert nodes into def_workflows:
-- Node 1: Calculate credit score
INSERT INTO def_workflows (
Workflow,
Package,
Name,
Rank,
NodeUp,
BranchDirect,
Description
) VALUES (
'loan_application',
'factory.core',
'calculate_credit_score',
1,
'CALC',
'validate_application',
'Calculate applicant credit score'
);
-- Node 2: Validate application
INSERT INTO def_workflows (
Workflow,
Package,
Name,
Rank,
NodeUp,
BranchSql,
BranchDirect,
Description
) VALUES (
'loan_application',
'factory.core',
'validate_application',
2,
'DECISION',
'SELECT CASE WHEN $credit_score$ >= 650 THEN "approve" ELSE "reject" END',
'send_decision_email',
'Validate application based on credit score'
);
-- Node 3: Send decision email
INSERT INTO def_workflows (
Workflow,
Package,
Name,
Rank,
NodeUp,
BranchDirect,
Description
) VALUES (
'loan_application',
'factory.core',
'send_decision_email',
3,
'EMAIL',
'DONE',
'Send approval or rejection email'
);
-- Node 4: Done
INSERT INTO def_workflows (
Workflow,
Package,
Name,
Rank,
NodeUp
) VALUES (
'loan_application',
'factory.core',
'workflow_complete',
4,
'DONE'
);
Executes calculations defined in def_service_calculation:
-- Define calculation
INSERT INTO def_service_calculation (
Service,
Package,
Name,
Calc,
Description
) VALUES (
'calculate_credit_score',
'factory.core',
'credit_score',
'SELECT credit_score FROM customers WHERE customer_id = "$customer_id$"',
'Fetch credit score from database'
);
Usage in workflow:
context = {"customer_id": "CUST001"}
# After CALC node executes
# context = {"customer_id": "CUST001", "credit_score": 720}
Runs custom Python service classes:
# services/LoanValidationService.py
from ObjApi import ObjApi
class LoanValidationService(ObjApi):
def run_workflow_direct(self, run_context):
"""Custom validation logic."""
loan_amount = run_context.get("loan_amount", 0)
credit_score = run_context.get("credit_score", 0)
# Custom business logic
if credit_score >= 700 and loan_amount <= 100000:
run_context["validation_result"] = "approved"
run_context["interest_rate"] = 4.5
elif credit_score >= 650:
run_context["validation_result"] = "approved"
run_context["interest_rate"] = 6.5
else:
run_context["validation_result"] = "rejected"
run_context["rejection_reason"] = "Credit score too low"
return run_context
Processes business rules to determine outcomes:
INSERT INTO def_workflows (
Workflow,
Package,
Name,
Rank,
NodeUp,
BranchSql,
Description
) VALUES (
'loan_application',
'factory.core',
'risk_assessment',
2,
'DECISION',
'SELECT CASE
WHEN $credit_score$ >= 750 THEN "low_risk"
WHEN $credit_score$ >= 650 THEN "medium_risk"
ELSE "high_risk"
END AS risk_level',
'Assess loan risk level'
);
Sends emails using templates:
INSERT INTO def_workflows (
Workflow,
Package,
Name,
Rank,
NodeUp,
NodeUpSql,
Description
) VALUES (
'loan_application',
'factory.core',
'send_approval_email',
5,
'EMAIL',
'SELECT
$user_email$ as email,
"Loan Application Approved" as subject,
CONCAT("Dear ", $user_name$, ", your loan application for $",
$loan_amount$, " has been approved.") as body',
'Send approval notification'
);
Sends SMS messages:
INSERT INTO def_workflows (
Workflow,
Package,
Name,
Rank,
NodeUp,
NodeUpSql,
Description
) VALUES (
'loan_application',
'factory.core',
'send_sms_notification',
6,
'SMS',
'SELECT
$phone_number$ as msisdn,
CONCAT("Your loan application has been ", $approval_status$) as message',
'Send SMS notification'
);
Generates or extracts documents:
-- Generate PDF from template
INSERT INTO def_workflows (
Workflow,
Package,
Name,
Rank,
NodeUp,
NodeUpExtra,
Description
) VALUES (
'loan_application',
'factory.core',
'generate_contract',
7,
'DOCUMENT',
'DOCTEMPLATE',
'Generate loan contract PDF'
);
Triggers webhooks or API calls:
INSERT INTO def_workflows (
Workflow,
Package,
Name,
Rank,
NodeUp,
NodeUpSql,
Description
) VALUES (
'loan_application',
'factory.core',
'notify_credit_bureau',
4,
'WEBHOOK',
'SELECT "https://api.creditbureau.com/notify" as url,
JSON_OBJECT("customer_id", $customer_id$,
"loan_amount", $loan_amount$) as payload',
'Notify credit bureau of application'
);
Unconditional transition to the next node:
UPDATE def_workflows
SET BranchDirect = 'send_email'
WHERE Name = 'validate_data';
Use SQL to determine the next node:
UPDATE def_workflows
SET BranchSql = '
SELECT CASE
WHEN $approval_status$ = "approved" THEN "send_approval_email"
WHEN $approval_status$ = "rejected" THEN "send_rejection_email"
ELSE "manual_review"
END AS next_node'
WHERE Name = 'decision_node';
Create multiple possible paths:
-- Risk-based routing
UPDATE def_workflows
SET BranchSql = '
SELECT CASE
WHEN $credit_score$ >= 750 THEN "auto_approve"
WHEN $credit_score$ >= 650 THEN "manual_review"
WHEN $credit_score$ >= 550 THEN "request_cosigner"
ELSE "auto_reject"
END AS next_step'
WHERE Name = 'risk_assessment';
Execute immediately and return results:
UPDATE def_workflow
SET Async = 'N'
WHERE Workflow = 'loan_application';
# Blocks until complete
result = wf.Run("loan_application", context=context)
print(f"Final result: {result}")
Queue for background processing:
UPDATE def_workflow
SET Async = 'Y'
WHERE Workflow = 'bulk_email_campaign';
# Returns immediately, queues for processing
result = wf.Run("bulk_email_campaign", context=context)
print("Workflow queued for processing")
START
↓
CALC: Validate email uniqueness
↓
DECISION: Email available?
├─ Yes → CALC: Create user account
│ ↓
│ EMAIL: Send welcome email
│ ↓
│ DONE
│
└─ No → EMAIL: Send error notification
↓
DONE
START
↓
CALC: Gather approval data
↓
DECISION: Auto-approve possible?
├─ Yes → EMAIL: Send approval
│ ↓
│ DONE
│
└─ No → FORMFLOW: Route to approver
↓
WEBHOOK: Wait for approval
↓
EMAIL: Send decision
↓
DONE
START
↓
IMPORT: Import data file
↓
CALC: Validate data
↓
DECISION: Valid data?
├─ Yes → CALC: Transform data
│ ↓
│ EXPORT: Export to warehouse
│ ↓
│ EMAIL: Success notification
│ ↓
│ DONE
│
└─ No → EMAIL: Error notification
↓
DONE
Use $variable_name$ syntax in SQL queries:
SELECT customer_name, credit_limit
FROM customers
WHERE customer_id = "$customer_id$"
AND account_status = "$account_status$"
Context variables are added by CALC nodes:
-- This calculation adds 'total_debt' to context
INSERT INTO def_service_calculation (
Service,
Package,
Name,
Calc
) VALUES (
'financial_analysis',
'factory.core',
'total_debt',
'SELECT SUM(outstanding_amount) as total_debt
FROM loans
WHERE customer_id = "$customer_id$"'
);
def run_workflow_direct(self, run_context):
# Read from context
customer_id = run_context.get("customer_id")
loan_amount = run_context.get("loan_amount", 0)
# Add to context
run_context["monthly_payment"] = loan_amount * 0.05
run_context["processing_date"] = datetime.now().isoformat()
return run_context
# Test a workflow from command line
dev-env/bin/python factory.core/ObjWorkflow.py run LOAN_APPLICATION \
--guid "TEST001"
# Validate workflow definition
dev-env/bin/python factory.core/ObjWorkflow.py closure LOAN_APPLICATION
# Export workflow visualization
dev-env/bin/python factory.core/ObjWorkflow.py export LOAN_APPLICATION
from ObjWorkflow import Workflow
# Enable debugging
wf = Workflow(0)
# Run with verbose context
context = {
"customer_id": "TEST001",
"debug_mode": True
}
try:
result = wf.Run("LOAN_APPLICATION", guid="TEST001", context=context)
print(f"Success: {result}")
except Exception as e:
print(f"Error: {e}")
# Check workflow transitions table
# SELECT * FROM def_workflow_transitions WHERE guid = 'TEST001'
-- View workflow execution history
SELECT * FROM def_workflow_transitions
WHERE guid = 'TEST001'
ORDER BY created_at;
-- Check workflow definition
SELECT * FROM def_workflows
WHERE Workflow = 'loan_application'
ORDER BY Rank;
-- Find failed workflows
SELECT DISTINCT guid, workflow_code, error_message
FROM def_workflow_transitions
WHERE status = 'ERROR'
AND created_at > DATE_SUB(NOW(), INTERVAL 1 DAY);
calculate_credit_score not node1-- Add error handling nodes
INSERT INTO def_workflows (
Workflow,
Package,
Name,
Rank,
NodeUp,
BranchDirect
) VALUES (
'loan_application',
'factory.core',
'handle_error',
99,
'EMAIL', -- Send error notification
'DONE'
);
loan_amount, loan_term, loan_type)context = {
# Good: Namespaced and clear
"customer_id": "CUST001",
"loan_amount": 50000,
"loan_term_months": 60,
"loan_type": "mortgage",
# Avoid: Generic names
# "amount": 50000,
# "term": 60,
# "type": "mortgage"
}
$variable$ syntax-- First node: Validate input
INSERT INTO def_workflows (
Workflow,
Package,
Name,
Rank,
NodeUp,
BranchSql
) VALUES (
'loan_application',
'factory.core',
'validate_input',
1,
'DECISION',
'SELECT CASE
WHEN "$customer_id$" REGEXP "^[A-Z0-9]+$"
AND $loan_amount$ > 0
AND $loan_amount$ < 1000000
THEN "process_application"
ELSE "reject_invalid_input"
END'
);
Check workflow exists:
SELECT * FROM def_workflow WHERE Workflow = 'loan_application';
Check nodes defined:
SELECT COUNT(*) FROM def_workflows WHERE Workflow = 'loan_application';
Identify circular references:
dev-env/bin/python factory.core/ObjWorkflow.py closure LOAN_APPLICATION
Check transition history:
SELECT Name, Rank, COUNT(*) as execution_count
FROM def_workflow_transitions
WHERE guid = 'STUCK001'
GROUP BY Name, Rank
HAVING COUNT(*) > 1;
Verify variable names:
-- Should use $customer_id$ not ${customer_id} or %customer_id%
SELECT * FROM customers WHERE customer_id = "$customer_id$"
Check context contains variable:
result = wf.Run("WORKFLOW", context={"customer_id": "CUST001"})
# Make sure context has the variable before node uses it
Check MongoDB queue:
from ObjDataMongo import ObjDataMongo
mongo = ObjDataMongo(0)
pending = mongo.mongo_pipeline("workflow_queue_context", [
{"$match": {"processed": {"$exists": False}}}
])
print(f"Pending workflows: {len(pending)}")
Verify async worker running:
ps aux | grep "ObjWorkflow"
# Should see worker process
Here's a complete end-to-end example:
from ObjWorkflow import Workflow
from datetime import datetime
# Step 1: Initialize workflow engine
wf = Workflow(0)
# Step 2: Prepare context with application data
context = {
"customer_id": "CUST001",
"application_id": f"APP{datetime.now().strftime('%Y%m%d%H%M%S')}",
"loan_amount": 75000,
"loan_term_months": 360,
"loan_purpose": "home_purchase",
"applicant_name": "John Doe",
"applicant_email": "john@example.com",
"applicant_phone": "+1234567890",
"employment_status": "employed",
"annual_income": 85000,
"requested_date": datetime.now().isoformat()
}
# Step 3: Execute workflow
try:
print("Submitting loan application...")
result = wf.Run(
workflow_code="LOAN_APPLICATION",
guid=context["application_id"],
context=context
)
print(f"\n✓ Workflow completed successfully")
print(f"Application ID: {context['application_id']}")
print(f"Approval Status: {result.get('approval_status', 'pending')}")
print(f"Credit Score: {result.get('credit_score', 'N/A')}")
if result.get('approval_status') == 'approved':
print(f"Interest Rate: {result.get('interest_rate')}%")
print(f"Monthly Payment: ${result.get('monthly_payment'):.2f}")
elif result.get('approval_status') == 'rejected':
print(f"Rejection Reason: {result.get('rejection_reason')}")
except Exception as e:
print(f"\n✗ Workflow failed: {e}")
print("Check logs for details")