The ObjMessageQueue class provides a high-level interface for interacting with message queue systems, primarily RabbitMQ. It abstracts the complexities of connecting, publishing, and consuming messages, offering a simplified and robust API for developers. This class is designed to be resilient, with built-in mechanisms for connection retries and message buffering.
The ObjMessageQueue class handles:
It can be configured to work with both local and remote RabbitMQ instances and can load credentials and connection details from a central configuration file.
This is the main class for interacting with the message queue.
__init__(self, DB=0, Connection: str = "")Initializes a new instance of the ObjMessageQueue class. It loads RabbitMQ connection settings (username, password, host, port) from the global configuration.
Connect(self, counter: int = 0)Establishes a connection to the RabbitMQ server. It will attempt to reconnect up to 5 times with an exponential backoff strategy if the connection fails.
connect_rabbit(self) -> NoneThe core method for connecting to RabbitMQ. It handles both standard and TLS-secured connections and sets up the necessary channel for communication.
create_queue(self, context: str, queue: str, counter=0) -> strCreates a queue on the RabbitMQ server if it doesn't already exist. The queue name is constructed from the context and queue parameters. It will retry up to 10 times on failure.
context: A string defining the context (e.g., application name).queue: The specific name of the queue.push(self, context: str, queue: str, guid: str, payload: dict, ...)Publishes a message to the specified queue. The payload is a dictionary that gets converted to a JSON string before being sent.
context: The context for the queue.queue: The name of the queue.guid: A unique identifier for the message.payload: The dictionary data to be sent as the message body.Example: Pushing a message
import ObjMessageQueue
mq = ObjMessageQueue.ObjMessageQueue()
mq.Connect()
payload_data = {
"user_id": 123,
"task": "process_payment",
"amount": 99.99
}
mq.push(
context="billing",
queue="payments",
guid="unique-guid-12345",
payload=payload_data
)
pop(self, context: str, queue: str)Retrieves one or more messages from the specified queue. Messages are fetched and placed into an internal buffer for processing. This method will block until at least one message is received or if the queue is empty.
pop_list(self, context, queue, max: int = MQ_QOS_BUFFER) -> listRetrieves a list of messages from the internal buffer. If the buffer is empty, it will first call pop() to fetch messages from RabbitMQ.
max: The maximum number of messages to retrieve.Example: Consuming messages
import ObjMessageQueue
mq = ObjMessageQueue.ObjMessageQueue()
mq.Connect()
# Pop a list of up to 5 messages
messages = mq.pop_list(context="billing", queue="payments", max=5)
for msg in messages:
print(f"Processing message: {msg}")
# Add processing logic here
qsize(self) -> intReturns the number of messages currently in the internal buffer.
count(self, context: str, queue: str) -> intReturns the number of messages waiting in the actual RabbitMQ queue on the server. This is useful for monitoring queue depth.
load(self, context: str, queue: str, sql: str = "", ...)Loads data from a database (either SQL or MongoDB) and pushes it into the message queue. This is useful for batch processing or migrating data.
sql: An SQL query to fetch data. If provided, the method will load from a SQL database.mongo_collection: The name of a MongoDB collection to fetch data from if sql is not provided.True if data was successfully loaded and pushed to the queue, otherwise None.status(self, context: str, queue: str) -> dictProvides a status report for the queueing system, including the number of pending, staged, and recently completed jobs.
The module includes a simple CLI powered by typer for basic operations.
submit: Loads tasks into a queue. It can be configured to load from either a SQL database or MongoDB.pop: Pops a small number of messages from the queue for testing.popall: A command to monitor and process all messages from a queue (implementation-specific).verify Checks the connection and prints the current queue status.Example CLI Usage:
python ObjMessageQueue.py submit
python ObjMessageQueue.py verify
cythonize -3 -a -i ObjMessageQueue.py
Compiling /home/axion/projects/axion/factory.core/ObjMessageQueue.py because it changed..[1/1] Cythonizing /home/axion/projects/axion/factory.core/ObjMessageQueue.py
Updated : 2025-09-10