
NOTICE: All information contained herein is, and remains
the property of TechnoCore.
The intellectual and technical concepts contained
herein are proprietary to TechnoCore and dissemination of this information or reproduction of this material
is strictly forbidden unless prior written permission is obtained
from TechnoCore.
The ObjMqtt class provides a robust client for connecting to an MQTT broker, subscribing to topics, and processing incoming messages. Its primary function is to listen for event messages, dynamically parse their JSON payloads, and store the data in a relational database in a structured format.
A key feature of this class is its ability to automatically manage the database schema based on the content of the messages it receives.
When an event message is received, the ObjMqtt class performs the following actions to store its data:
Table Name Determination: It constructs a table name based on the event_type field in the message payload. The format is track_event_<event_type>. For example, an event_type of user_login will result in a table named track_event_user_login.
Dynamic Column Creation: The class inspects the top-level keys within the event_state JSON object of the payload. For each key, it ensures a corresponding column exists in the target table.
Data Type Inference: The system automatically determines the appropriate data type for each new column:
dict) or a list (list), the column is created with the JSON data type.TEXT data type.Data Insertion: The message data is then inserted as a new row in the table.
This process ensures that the database schema evolves automatically with the data, requiring no manual intervention when new fields are introduced in the MQTT messages.
Every table created by the ObjMqtt class includes the following standard columns:
Guid: A unique identifier for the event.Package: The software package or service that generated the event.SourceHost: The hostname or IP address of the machine where the event originated.Payload: A JSON column used as a fallback to store the event_state if it is not a dictionary.created_at: A timestamp indicating when the record was inserted.Consider an MQTT message published to the event/system topic with the following JSON payload:
{
"guid": "evt-12345-abcde",
"package": "auth_service",
"host": "prod-server-01",
"event_type": "user_login",
"event_state": {
"username": "jdoe",
"ip_address": "192.168.1.100",
"successful": true,
"metadata": {
"client": "WebApp",
"version": "2.5.1"
}
}
}
track_event_user_login.event_state: username, ip_address, successful, and metadata.track_event_user_login table.
username, ip_address, and successful will be TEXT.metadata will be JSON, because its value is a dictionary.To optimize database performance, messages are processed in batches. The class collects messages in memory and inserts them into the database in bulk, either when the batch reaches a certain size (batch_size) or when a timeout (batch_timeout) is reached.
ObjMqtt uses the paho-mqtt VERSION2 callback API. The _on_connect
method signature is:
def _on_connect(self, client, userdata, connect_flags, reason_code, properties):
if reason_code == 0:
# connected — subscribe to topics
else:
# failed — reason_code gives a human-readable description
reason_code is a paho ReasonCode object; reason_code == 0 tests for
success and str(reason_code) gives a descriptive error string on failure.
The script can be run as a command-line tool to monitor MQTT topics directly.
python factory.core.ObjMqtt.py [TOPIC]
# to subscribe to all topics.Example:
# Monitor all topics
python factory.core.ObjMqtt.py
# Monitor a specific topic
python factory.core.ObjMqtt.py "axion/events/system"