ObjSignalMqtt provides MQTT-based signaling for inter-process communication and event broadcasting. It extends the signal system to use MQTT as a transport layer for distributed messaging.
Module: factory.core/ObjSignalMqtt.py
Inherits from: ObjSignal.ObjSignal or ObjMqtt.ObjMqtt
Protocol: MQTT (Message Queuing Telemetry Transport)
Typical topic structure:
axion/signals/{signal_type}axion/signals/{module}/{event}axion/signals/broadcastfrom ObjSignalMqtt import ObjSignalMqtt
# Create instance
signal_mqtt = ObjSignalMqtt()
# Publish signal
signal_mqtt.publish_signal(
signal_type="workflow_complete",
payload={"workflow_id": "123", "status": "success"},
topic="axion/signals/workflow"
)
from ObjSignalMqtt import ObjSignalMqtt
signal_mqtt = ObjSignalMqtt()
# Subscribe to topic
def handle_signal(topic, payload):
print(f"Received signal on {topic}: {payload}")
signal_mqtt.subscribe_signal(
topic="axion/signals/workflow",
callback=handle_signal
)
# Start listening
signal_mqtt.start_listening()
# Broadcast system-wide event
signal_mqtt.broadcast(
event="system_shutdown",
data={"reason": "maintenance", "eta": "5min"}
)
MQTT connection configured in config.yaml:
mqtt:
host: localhost
port: 1883
username: axion
password: ${environment_MQTT_PASSWORD}
paho-mqtt - MQTT client libraryObjMqtt or ObjSignal - Parent classObjMqtt.py - MQTT client functionalityObjSignal.py - Base signal handlingObjNotify.py - MQTT-based notifications