Source: factory.core/ObjDataImport.py
| Method | Signature | Description |
|---|---|---|
| get_trigger_mask | get_trigger_mask(import_code) |
|
| check_trigger | check_trigger(directory, trigger_mask) |
|
| process_event | process_event(src_path) |
|
| on_created | on_created(event) |
|
| on_modified | on_modified(event) |
|
| on_moved | on_moved(event) |
Tracks import performance metrics across all imports.
Provides real-time visibility into import performance, error rates,
and overall system health. Metrics are shared across all import instances.
| Method | Signature | Description |
|---|---|---|
| debug | debug(*args) |
Debug logging for ImportMetrics. |
| record_import | record_import(rows: int, elapsed: float, success: bool = True) |
Record metrics from a completed import. |
| get_stats | get_stats() -> dict |
Get current performance statistics. |
| print_stats | print_stats() |
Print formatted statistics to debug log. |
Manages the entire data import process.
| Method | Signature | Description |
|---|---|---|
| patch_param | patch_param(text: str, depth: int = 0) -> str |
Replaces placeholder parameters in a string with their values. |
| pre_sql | pre_sql() |
Executes a pre-SQL script if defined in the import configuration. |
| post_sql | post_sql() |
Executes a post-SQL script if defined in the import configuration. |
| cleanup_old_import_tables | cleanup_old_import_tables() |
Removes temporary import tables older than two days. |
| process | process(guid, data_import_code: str) |
Main processing function for a data import task. |
| run_workflow | run_workflow(workflow_name) |
Runs a specified workflow. |
| update_stage | update_stage(guid: str, success: str) |
Updates the status of the import task in the staging table. |
| factory_object | factory_object(filename: str) |
Dynamically loads the correct file import handler. |
| get_file_guid | get_file_guid(filename: str, block_size: int = 65536) -> str |
Generates an MD5 hash for a file to serve as its unique ID. |
| must_import | must_import(data_import_code, directory, filename) -> bool |
Checks if a file should be imported. |
| update_tracking | update_tracking(data_import_code, directory, filename, file_size, file_rows, workflow = '') |
Logs the import event. |
| sanitize_column_names | sanitize_column_names(columns: list) -> list |
Cleans and sanitizes column names to be SQL-friendly. |
| commit_block | commit_block() |
Commits a block of rows to the temporary import table using executemany. |
| commit_block_fast | commit_block_fast() |
High-performance import using LOAD DATA INFILE (10-50x faster than executemany). |
| write_segments | write_segments(columns, row_max) |
|
| commit_data | commit_data(cols, max_rows = None) -> bool |
Orchestrates the data writing process. |
| import_file | import_file(file_name: str, directory: str = '') -> bool |
Handles the import process for a single file. |
| write_metrics_to_db | write_metrics_to_db(filename: str, file_size_bytes: int, import_session_guid: str, file_hash: str | None, row_count: int, elapsed_seconds: float, memory_usage_mb: float | None, success: bool, normalization_skipped: bool = False, error_message: str | None = None) |
Persists import metrics to def_dataimport_metric table. |
| send_status_message | send_status_message(message: str) |
Sends a status message to the current package's channel. |
| cleanup_old_metrics | cleanup_old_metrics(days_to_keep: int = 90, batch_size: int = 10000) -> dict |
Remove metrics older than specified days. |
| detect_anomalies | detect_anomalies(import_code: str | None = None, sensitivity: str = 'medium', days_baseline: int = 30) -> list |
Detect performance anomalies using statistical analysis. |
| import_directory | import_directory(filename_mask, directory: str) |
Imports all files in a directory matching a mask. |
| new_column | new_column(table, col, col_type) |
Adds a new column to a table if it doesn't exist. |
| add_key | add_key(table, col) |
Adds an index to a table column. |
| create_table | create_table(columns) |
Creates the temporary table for the import. |
| process_text | process_text(text) |
Processes text using the ObjProcessText module. |
| direct_import | direct_import(guid = '', import_code = '') |
Initiates a direct, on-demand import. |
| download_test_data | download_test_data(dataset_name: str = 'all') |
Downloads test datasets to local.documents/test for comprehensive testing. |
| get_trigger_mask | get_trigger_mask(import_code) |
|
| check_trigger | check_trigger(directory, trigger_mask) |
|
| scan_and_import | scan_and_import(watch_config, imported_files, worker_queue) |
Scans watched directories for new files to import. |
| watch | watch() |
Watches directories for new files based on the configuration. |
| get_watch_config | get_watch_config(use_cache = True) -> dict |
Retrieves the directory watch configuration from the database. |
| reload_watch_config | reload_watch_config() |
Force reload of watch configuration (clears cache). |
| run_once | run_once() |
Scans for all matching import files, runs the imports once, and then exits. |
Optimized integer detection using try/except (2-3x faster).
Optimized float detection using try/except (2-3x faster).
Optimized date detection (YYYY-MM-DD format).
Optimized time detection (HH:MM:SS or HH:MM format).
Optimized datetime detection (YYYY-MM-DD HH:MM:SS format).
Periodically displays the contents of the worker queue in a rich table.
Shows a list of available imports and runs the selected one.
Runs a direct import for a given import code.
Watches directories for new files based on the configuration.