Source: factory.core/ObjWorkflow.py
Base exception for all workflow execution errors.
This is the parent class for all workflow-specific exceptions. Use this
when catching workflow errors generically, or subclass it for specific
error conditions.
...
Raised when a workflow code is not found in the database.
This error indicates that the requested workflow does not exist in
def_workflows or def_workflow tables.
Raised when workflow preparation fails.
This error occurs during prepare_run() when:
Raised when workflow configuration is invalid.
This error indicates problems with workflow structure:
Raised when workflow exceeds maximum transition limit.
This error prevents infinite loops by enforcing a maximum number of
transitions (default: 100). Usually indicates:
Raised when a workflow node fails during execution.
This error wraps exceptions raised by individual node executors
(API, Service, Calculation, etc.) and provides workflow context.
Raised when workflow execution exceeds timeout limit.
This error prevents workflows from running indefinitely by enforcing
execution time limits configured in WorkflowConfig.
Raised when async workflow queueing fails.
This error occurs when:
Workflow execution configuration.
Centralizes all workflow execution flags and settings.
Replaces global DO_* flags with a type-safe configuration class.
| Method | Signature | Description |
|---|---|---|
| from_env | from_env(cls) -> 'WorkflowConfig' |
Create configuration from environment variables and config.yaml. |
| development_mode | development_mode(cls) -> 'WorkflowConfig' |
Create configuration for development/debugging. |
| production_mode | production_mode(cls) -> 'WorkflowConfig' |
Create configuration for production. |
Enum for workflow kickoff types
| Method | Signature | Description |
|---|---|---|
| sql_list | sql_list(cls) -> str |
Returns SQL-formatted comma-separated list of quoted values |
| Method | Signature | Description |
|---|---|---|
| execute | execute(original_self: Any, run_context: WorkflowContextType, current_result: str, input_guid: str, node_up: str, name: str, **kwargs) -> NodeExecuteResultType |
Provides a Workflow class to manage various workflow processes.
The Workflow class is a specialized extension designed to handle workflow
operations, manage connected workflows, and execute synchronous or
asynchronous tasks. It integrates with a message queue system, handles Kafka
...
| Method | Signature | Description |
|---|---|---|
| running | running() -> bool |
Thread-local workflow running flag |
| running | running(value: bool) -> None |
Set thread-local workflow running flag |
| last_state | last_state() -> str |
Thread-local workflow last state |
| last_state | last_state(value: str) -> None |
Set thread-local workflow last state |
| run_context | run_context() -> WorkflowContextType |
Thread-local workflow run context |
| run_context | run_context(value: WorkflowContextType) -> None |
Set thread-local workflow run context |
| sim_context | sim_context() -> WorkflowContextType |
Thread-local simulation context |
| sim_context | sim_context(value: WorkflowContextType) -> None |
Set thread-local simulation context |
| set_user | set_user(user: str) -> None |
|
| set_kafka | set_kafka(kafka: Any = 0) -> None |
|
| local_patch_param | local_patch_param(context: WorkflowContextType, guid: str, processguid: str, text: str = '') -> str |
Replaces placeholders in a text string with corresponding values from the provided context, |
| get_workflow_basename | get_workflow_basename(webhook_code: str) -> tuple[str, str] |
Fetches the workflow name and description based on a provided webhook code. The method |
| get_connected_workflows | get_connected_workflows(workflow_code: str, guid: str, context: WorkflowContextType, package: str, archetype: str) -> None |
|
| is_workflow_gui | is_workflow_gui(workflow_code: str) -> bool |
Check if workflow is GUI-based (thread-safe cached lookup) |
| is_workflow_async | is_workflow_async(workflow_code: str) -> bool |
Check if workflow is asynchronous (thread-safe cached lookup) |
| Run | Run(workflow_code: str, guid: str = '', context: Optional[WorkflowContextType] = None, start_time: Any = '', rerun: int = 0, simul_guid: str = '', force_async_run: bool = False) -> WorkflowContextType |
Execute a workflow by its code, handling both synchronous and asynchronous execution. |
| prepare_run | prepare_run(workflow_code: str, guid: str, context: WorkflowContextType, package: str, archetype: str) -> tuple[Optional[str], list[Any], Optional[dict], Optional[str], Optional[str], Optional[str], str] |
Prepares a runtime environment for a workflow execution by checking and updating |
| run_workflow | run_workflow(workflow_buffer: WorkflowBufferType, workflow_code: str, guid: str, current_guid: str, context: WorkflowContextType, start_time: float, simul_guid: str = '', resume_node: str = '', root_guid: str = '') -> WorkflowContextType |
Core workflow execution engine that traverses and executes a workflow DAG. |
| get_context | get_context() -> WorkflowContextType |
|
| walk_nodes | walk_nodes(current_node: str, nodes: dict[str, Any], walk: list[str], context: list[Any]) -> None |
Recursively walk through a list of nodes and build up a list of walked nodes and context inputs. |
| render_context_resultset | render_context_resultset(run_context: JsonDict) -> None |
Simple debug helper for rendering workflow context results. |
| Select | Select(Param1: str, Param2: str) -> None |
|
| run_set | run_set(workflow_name: str, dispatcher_role: str) -> None |
|
| serve_set | serve_set() -> None |
|
| stage_update | stage_update(workflow_code: str, guid: str, context: WorkflowContextType, wf_obj_result: JsonDict) -> None |
|
| save_to_stage | save_to_stage(task_guid: str, root_guid: str, workflow_code: str, resume_node: str = '', scheduled_time = None, context: str = '', fanin_node: str = '', fanin_subguids: str = '[]', status: str = 'PENDING') -> None |
|
| dispatch_stage_items | dispatch_stage_items(queue: Any, package: str) -> int |
Poll stage_workflow for ready items and push them to the message queue. |
| reactivate_stage_item | reactivate_stage_item(task_guid: str) -> WorkflowContextType |
Load a stage_workflow row by guid and resume execution inline. |
| launch_service | launch_service(role: str) -> None |
|
| run_direct | run_direct(workflow_code: str, guid: str, context: Optional[WorkflowContextType] = None) -> WorkflowContextType |
|
| generate_mermaid | generate_mermaid(workflow_name: str, package: str = '', direction: str = 'LR', palette: dict | None = None) -> str |
Generate a Mermaid flowchart for a workflow. |
| send_email | send_email(workflow_name: str, recipients: str = '', package: str = '', with_ai: bool = False) -> bool |
Send a workflow report via email. |
| review | review(workflow_code: str, prompt: str = '', model: str = '') -> str |
Generate an AI-powered workflow review for documentation. |
Generate an AI-powered workflow review for documentation.
Send a branded workflow report via email.
Resume a PAUSED workflow execution from stage_workflow by guid.
Wl = HookSet()
Run TEST_WORKFLOW and assert calculation results are stable.