Service queue engine. Loads service definitions from
def_service, resolves the factory module for the
service type, builds payloads, executes Connect/Send,
and writes results back to stage_service.
Objects.Object
-> ObjData.ObjData
-> ObjApi.ObjApi
-> ObjService.ObjService <- this
-> ObjServiceXxx.ObjServiceApi (per service type)
Objects.Object
-> ObjData.ObjData
-> ObjApi.ObjApi
-> ObjSupervisor.Supervisor
-> ObjService.ServiceSet <- queue reader
ServiceSet.Read(context)
1. Extract service_code, guid from MQ context
2. Look up Service name (cached in SERVICE_CODE_BUFFER)
3. Create ObjService instance
4. ObjService.Read(service_code)
a. Load def_service row (sql_read_object)
b. If not cached: service_base_factory()
- Scan factory.service/ directories (cached)
- Import ObjService{Type}.py module
- Load remote connection (single query)
- Load pre/post SQL
- Cache in SERVICE_BASE_CACHE
c. If cached: copy.copy() from cache
5. ObjService.Process(service_code, params)
a. Transfer params to FObj
b. Build payload (TXT/CSV/JSON/INJECT)
c. Execute PreSQL
d. Connect + Send
e. Execute PostSQL
f. stage_update() — write results
g. Return [Result1, Result2, Result3]
Read(ServiceCode, Service)Loads service definition, resolves factory, caches.
Process(service_code, service, param1..param9, ...)Full service execution: payload, connect, send, update.
BuildPayload(FObj)Builds payload based on PayloadType (TXT, CSV, JSON,
INJECT). TXT/CSV execute PayloadSql and format results.
patch_param(text, depth)Single-pass token substitution using a dict of
$param1$..$param9$, $guid$, $baseguid$,
$result1$..$result9$, $zonedate$, etc.
stage_update() / stage_launch() / stage_reset()Write service execution results to stage_service.
service_base_factory(service_set, service_type)Scans factory.service/ and package.* directories
for ObjService{Type}.py, imports the module, returns
an ObjServiceApi instance. Directory listing is
cached per process via _SERVICE_FILE_CACHE.
| Variable | Key | Purpose |
|---|---|---|
SERVICE_BASE_CACHE |
service_code.lower() |
Cached factory objects (full service config) |
SERVICE_CODE_BUFFER |
service_code |
Service name lookup cache |
_SERVICE_FILE_CACHE |
package:archetype |
Filesystem scan cache |
| Table | Purpose |
|---|---|
def_service |
Service definitions (code, type, payload config) |
def_service_parameters |
Parameter mapping per service |
stage_service |
Service execution queue |
stage_service_monitor |
Queue health monitoring |
Def_RemoteConnections |
Remote endpoint credentials |
All static SQL moved to ObjService.yaml under
database.queries:
| Query | Purpose |
|---|---|
read_service |
Load service definition |
load_remote_connection |
Single query for all remote config |
load_pre_sql |
Load pre-execution SQL |
load_post_sql |
Load post-execution SQL |
stage_insert |
Insert into stage_service |
stage_launch |
Set status to LAUNCH |
stage_reset |
Reset status |
get_service_by_code |
Lookup Service name |
register_servicetype |
Register new service type |
register_service |
Register new service |
| # | Fix | Before | After |
|---|---|---|---|
| 1 | 3 RemoteConnection queries consolidated | 3 DB calls | 1 DB call |
| 2 | get_package_and_archetype() cached |
4 calls per lifecycle | 1 call |
| 3 | Single-pass patch_param() |
28+ sequential .replace() |
Dict scan, early exit |
| 4 | Filesystem scan cached | os.listdir() x4 dirs per call |
Once per process |
| 5 | stage_update() result loop |
8 try/except blocks | Single getattr() loop |
| 6 | BuildPayload() string building |
template += in loops |
parts.append() + join() |
| 7 | get_queries() cached per instance |
Called per method | Called once |
| 8 | __dict__ scan for params removed |
Loop with startswith |
Direct assignment |
| 9 | list(dict.keys()) removed |
Unnecessary conversion | Direct in dict |
| 10 | All SQL moved to YAML | Inline f-strings | get_queries() |
| 11 | Constants to ObjConstants | Module-level | ServiceConstants class |
| 12 | Dead code removed | AVRO_DICT, STRATEGY_MODE, inline tests |
Deleted |
| 13 | Bug fix: Result8 wrote Result9 | Copy-paste error | Corrected |
# Reset LAUNCH status and process queue
python ObjService.py stage [filter] [filter2]
# Start dispatcher
python ObjService.py dispatcher [instance]
# Start worker
python ObjService.py worker [instance]
# Start default service
python ObjService.py service
Updated: 2026-04-13