WebHooks.py contains 50+ hardcoded SQL queries that should be moved to
factory.core/ObjHook.yaml following the project pattern.
| Line(s) | Method | Query Purpose | Priority |
|---|---|---|---|
| 236-278 | Read() | Load webhook definition | HIGH |
| 327-338 | Read() | Load input parameters | HIGH |
| 359-370 | Read() | Load output parameters | HIGH |
| 417-425 | _add_webhook_columns() | Add bloom columns | MEDIUM |
| 1006-1009 | _validate_payload() | Get validation list | FIXED |
| 1065-1082 | _get_output_validation_list() | Get output validations | FIXED |
| 1126-1132 | _validate_output_required() | Get required params | HIGH |
| 1182-1191 | _update_base_table() | Update bloom table | MEDIUM |
| 1399-1407 | commit() | Count reflections | HIGH |
| 1419-1428 | commit() | Get reflection list | HIGH |
| 1436-1440 | commit() | Get payload type | MEDIUM |
| 1689-1695 | RenderResult() | Get example values | LOW |
| 1723-1730 | RenderResult() | Get post type | MEDIUM |
| 1875-1881 | RenderResult() | Update bloom results | MEDIUM |
| 1960-1965 | _validate_api_key() | Validate username/password | HIGH |
| 1986-1993 | _validate_api_key() | Update user session | MEDIUM |
| 2048-2050 | set_parameter() | Get normal parameter | LOW |
| 2077-2083 | test_payload_case1() | Get test data | LOW |
| 2233-2236 | ReadSet() | Get batch definition | HIGH |
| 2242-2244 | ReadSet() | Count batch volume | MEDIUM |
| 2255-2258 | ReadSet() | Get batch GUIDs | HIGH |
| 2268-2276 | ReadSet() | Get batch data (N+1 ISSUE) | CRITICAL |
Total: 22+ query locations, many with multiple queries
Before (Lines 1006-1009):
def _get_validation_list(self, package: str, archetype: str) -> list:
sql = f"""
SELECT ValidationSql, ValidationNote, remote_connection, Description
FROM def_webhook_validations
WHERE webhookcode = '{self._Webhookcode}'
AND package IN ('{package}', '{archetype}')
ORDER BY Rank DESC;
"""
validation_list = self.sql_get_list(sql)
return validation_list
After:
def _get_validation_list(self, package: str, archetype: str) -> list:
query_template = self.get_query_from_yaml(
"get_input_validations",
"factory.core/ObjHook.yaml"
)
sql = query_template.format(
webhook_code=self.escape_sql(self._Webhookcode),
package=self.escape_sql(package),
archetype=self.escape_sql(archetype)
)
validation_list = self.sql_get_list(sql)
return validation_list
Added to ObjHook.yaml:
queries:
get_input_validations: |
SELECT
ValidationSql,
ValidationNote,
remote_connection,
Description
FROM def_webhook_validations
WHERE webhookcode = '{webhook_code}'
AND package IN ('{package}', '{archetype}')
AND COALESCE(Active, 'Y') = 'Y'
ORDER BY Rank DESC
1. Read() Method Queries (Lines 236-370)
Move to ObjHook.yaml:
queries:
read_webhook_definition: |
SELECT * FROM def_webhook
WHERE WebhookCode LIKE '{webhook_code}'
AND Direction = 'IN'
AND if(ifnull(Package,'')='','CORE',Package)
in ('{package}','CORE','ADHOC')
read_webhook_input_parameters: |
SELECT DISTINCT
Parameter,
COALESCE(NormalParameter, ''),
COALESCE(Required, 'N'),
COALESCE(Encoding, '')
FROM def_webhook_parameters
WHERE PostType IN ('POST', 'HEAD', 'GET', 'DERIVED', 'ENCODED')
AND webhookcode = '{webhook_code}'
AND COALESCE(Package, 'CORE')
IN ('{package}', 'CORE', 'ADHOC')
ORDER BY Rank
read_webhook_output_parameters: |
SELECT DISTINCT
Parameter,
NormalParameter,
COALESCE(Example, ''),
ValueType
FROM def_webhook_parameters
WHERE PostType = 'RETURN'
AND webhookcode = '{webhook_code}'
AND COALESCE(Package, 'CORE')
IN ('{package}', 'CORE', 'ADHOC')
ORDER BY Rank
Refactor Read() method:
def Read(self, webhook_code: str, build_yaml="") -> None:
# ... validation code ...
# Load webhook definition
query = self.get_query_from_yaml(
"read_webhook_definition",
"factory.core/ObjHook.yaml"
)
sql = query.format(
webhook_code=self.escape_sql(self.WebhookCode),
package=self.escape_sql(self.Package)
)
ret = self.sql_read_object(sql)
# Load input parameters
query = self.get_query_from_yaml(
"read_webhook_input_parameters",
"factory.core/ObjHook.yaml"
)
sql = query.format(
webhook_code=self.escape_sql(self._Webhookcode),
package=self.escape_sql(self.Package)
)
parameters = self.sql_get_list(sql)
INPUT_PARAMETER_BUFFER[self._Webhookcode] = parameters
# ... rest of method
2. Reflection Queries (Lines 1399-1440)
queries:
count_webhook_reflections: |
SELECT count(*)
FROM def_webhook_reflections
WHERE WebhookCode = '{webhook_code}'
AND package in ('{package}', '{archetype}')
AND Active = 'Y'
get_webhook_reflections: |
SELECT
MirrorCode,
MirrorDelay,
coalesce(LookupSql,''),
'' as MirrorMode,
MirrorRemote,
MirrorTable
FROM def_webhook_reflections
WHERE WebhookCode = '{webhook_code}'
AND package in ('{package}', '{archetype}')
AND Active = 'Y'
get_reflection_payload_type: |
SELECT PayloadType, ReflectionType
FROM def_webhook
WHERE WebhookCode = '{webhook_code}'
AND package in ('{package}', '{archetype}')
3. Batch Processing N+1 Query (Lines 2268-2276) - CRITICAL
Before (N+1 anti-pattern):
guid_list = self.sql_get_list(sql) # Gets list of GUIDs
for guid_rec in guid_list:
# SEPARATE QUERY FOR EACH GUID!
sql = f"SELECT * FROM {self._Inputtable} WHERE {self._Inputguid} = '{guid_rec}'"
data_set = self.sql_get_dictionary(sql)
batch[guid_rec] = data_set
After (Single query with IN clause):
queries:
get_batch_data_bulk: |
SELECT *
FROM {input_table}
WHERE {input_guid} IN ({guid_list})
{completed_filter}
LIMIT {limit}
# Build comma-separated list
guid_placeholders = ','.join([f"'{self.escape_sql(g)}'" for g in guid_list])
query = self.get_query_from_yaml("get_batch_data_bulk", "factory.core/ObjHook.yaml")
sql = query.format(
input_table=self._Inputtable,
input_guid=self._Inputguid,
guid_list=guid_placeholders,
completed_filter=f"AND {self._Completedfilter}" if self._Completedfilter else "",
limit=QUEUE_SIZE
)
# Single query returns all records
result_set = self.sql_get_all_dict(sql) # Returns list of dicts
for row in result_set:
guid = row[self._Inputguid]
batch[guid] = row
4. Bloom Table Updates (Lines 1182-1191, 1875-1881)
queries:
update_bloom_base_table: |
UPDATE {table_base} SET
bloom_status = '{status}',
bloom_status_code = {status_code},
bloom_status_note = '{status_note}',
bloom_validation_description = '{validation_description}',
bloom_validation_resultnote = '{validation_note}',
bloom_rdg = '{random_digit}',
bloom_remoteip = '{remote_ip}',
bloom_useragent = '{user_agent}',
bloom_connectionId = CONNECTION_ID()
WHERE base_{guid_name} = '{base_guid}'
update_bloom_results: |
UPDATE {table_base}
SET bloom_results = '{results_json}'
WHERE base_{guid_name} = '{base_guid}'
5. Parameter Lookup (Lines 2048-2050)
queries:
get_normal_parameter_name: |
SELECT Parameter
FROM Def_WebHook_Parameters
WHERE WebhookCode like '{webhook_code}'
AND NormalParameter = '{parameter}'
AND package = '{package}'
6. Test Methods (Lines 2077-2083)
queries:
get_test_payload_data: |
SELECT guid, idnumber, coalesce(cs_cell_phone_1,'')
FROM bloom_donormalenquiry
ORDER BY RAND()
LIMIT 1
7. Mock Data (Lines 1689-1695)
queries:
get_parameter_example: |
SELECT Example, ValueType
FROM Def_Webhook_parameters
WHERE posttype in ('RETURN','JSON')
AND webhookcode = '{webhook_code}'
AND Parameter = '{parameter}'
queries:
your_query_name: |
SELECT column1, column2
FROM table_name
WHERE condition = '{parameter}'
Before:
sql = f"""
SELECT column1, column2
FROM table_name
WHERE condition = '{self.value}'
"""
result = self.sql_get_list(sql)
After:
query_template = self.get_query_from_yaml(
"your_query_name",
"factory.core/ObjHook.yaml"
)
sql = query_template.format(
parameter=self.escape_sql(self.value)
)
result = self.sql_get_list(sql)
Always use escape_sql() for user input:
webhook_code=self.escape_sql(self._Webhookcode)
Table/column names from trusted sources only:
# OK - from database schema
input_table=self._Inputtable
# NOT OK - from user input
table_name=user_provided_table # SQL injection risk!
Validate identifiers before formatting:
if not self._Inputtable.isidentifier():
raise ValueError("Invalid table name")
Estimated effort: 4-6 hours for complete refactoring
Create test for each refactored query:
def test_get_input_validations_query():
hook = WebHooks()
hook._Webhookcode = 'TEST_HOOK'
validations = hook._get_validation_list('homechoice', 'CORE')
assert isinstance(validations, list)
# Verify query executed without error
Test full webhook flow:
def test_webhook_read_with_yaml_queries():
hook = WebHooks()
hook.Read('CUSTOMER_CREATE')
assert hook._Webhookcode == 'CUSTOMER_CREATE'
assert len(hook.params) > 0
assert len(hook.returns) > 0
Verify escape_sql() prevents injection:
def test_sql_injection_prevented():
hook = WebHooks()
hook._Webhookcode = "'; DROP TABLE users; --"
# Should not execute DROP TABLE
hook._get_validation_list('homechoice', 'CORE')
# Verify users table still exists
assert hook.has_table('users')
This refactoring addresses several issues from the code analysis:
feat/webhooks_sql_yaml_refactor