I am using RabbitMQ 4.0.4 with AMQP 1.0 in node via the rhea and rhea promise libraries. I am using topics.
How are the queues related to topics are created and deleted in RabbitMQ with AMQP 1.0 with rhea/rhea-promise?
If I stop and restart a service (investigation after problems), everything goes well except that a queue related to a topic on which the service that I stop/restart sends or receives messages still gets messages. There are still messages posted on that topic, but why this queue is not deleted? I have read about the "auto-delete" property, but the problem is that queues can be deleted too quickly.
So, as this link suggests I create an expiry policy, but my senders and receivers are not able to send/receive messages of "session_error. But I cannot get the exact error since there is no error in the context in the session error event. If I restart the whole system (services and RabbitMQ, and rabbitMQ has persistent volume in order not to lost the messages that not have been delivered), my service still does not work. It is as if it is trying to connect to the queue that has been deleted. The only way to have my system works is to delete the volume of RabbitMQ, and restart the system.
However, there are mechanism which are implemented to receive/send message when the action cannot be performed.
- sender: it is deleted and recreated when a message delivery is "released" because the address is not found.
- receiver: at a given frequency, it is checked whether or not it is active. If not, it is deleted and another receiver is created.
But they are not enough to be able to send/receive messages in RabbitMQ, the queues related to topics are not recreated.
Here is the code to create receiver:
import { getLogger } from "log4js";
import {
Connection,
type EventContext,
type ReceiverOptions,
} from "rhea-promise";
const config: any = {
host: "a-host",
port: "a-port",
transport: "a-transport",
username: "a-username",
password: "a-password",
};
const connection: Connection = new Connection(config);
const receiverOptions: ReceiverOptions = {
name: "a-name",
source: {
address: "a-topic",
capabilities: ["topic"],
},
onSessionError: (context: EventContext) => {
getLogger().error("onSessionError :", context.error);
},
onSessionClose: (context: EventContext) => {
getLogger().error("onSessionClose:", context.error);
},
onClose: (context: EventContext) => {
getLogger().error("onClose :", context.error);
},
onError: (context: EventContext) => {
getLogger().error("onError :", context.error);
},
onMessage: (context: EventContext) => {
if (context.message?.body?.content !== undefined) {
context.message.body = context.message.body.content.toString("utf8");
}
getLogger().info("A message is received");
},
};
const receiver = await connection.createReceiver(receiverOptions);
And here is the code to create sender:
import { getLogger } from "log4js";
import {
Connection,
CreateAwaitableSenderOptions,
Sender,
type EventContext,
} from "rhea-promise";
const config: any = {
host: "a-host",
port: "a-port",
transport: "a-transport",
username: "a-username",
password: "a-password",
};
const connection: Connection = new Connection(config);
function cleanSender(sender: Sender | undefined) {
if (sender) {
sender
.close()
.then(() => sender?.remove())
.catch((error) =>
getLogger().error(`Failed to clean up sender after an error. ${error}`)
);
}
}
const senderOptions: CreateAwaitableSenderOptions = {
name: "a-sender-name",
target: {
dynamic: true,
address: "a-topic",
capabilities: ["topic"],
},
//attached to the session_error event
onSessionError: (context: EventContext) => {
getLogger().error("onSessionError :", context.error);
cleanSender(context.sender);
},
//attached to the session_close event
onSessionClose: (context: EventContext) => {
getLogger().error("onSessionClose :", context.error);
},
//attached to the "sender_error" event
onError: (context: EventContext) => {
getLogger().error("onError :", context.error);
cleanSender(context.sender);
},
// attached to the sender_close event
onClose: (context: EventContext) => {
getLogger().error("onClose :", context.error);
},
};
if (senderOptions && senderOptions.session) {
const sender = senderOptions.session.createAwaitableSender(senderOptions);
} else {
const session = await connection.createSession({
abortSignal: senderOptions && senderOptions.abortSignal,
});
const sender = session.createAwaitableSender(senderOptions);
}
I have specific problem with the connection. There is one connection per service, and each service can have multiple receiver/sender topic.
That is why I am asking: how the queues are created and deleted with the topics in RabbitMQ with AMQP 1.0 ?
Notes:
- there is a reconnection mechansim which works - that is why I haven't given the code.
- the code works on Azure Service Bus - the topics and the subscription to a topic are created in ASB, and the sender/receiver are re created if needed.
Thank you !