0

I'm trying to add some functionality for rabbitmq with delay messages. Actually I need to get this message after 2 weeks. As I know we do not need any plugin. Also when this message invokes, how should I reschedule new x delay exchanger to invoke again over 2 weeks. Where shoul I added this x delay message.

config

"messageQueue": {
        "connectionString": "amqp://guest:guest@localhost:5672?heartbeat=5",
        "queueName": "history",
        "exchange": {
            "type": "headers",
            "prefix": "history."
        },
        "reconnectTimeout": 5000
    },

service:

import amqplib from 'amqplib'
import config from 'config'

import logger from './logger'

const {reconnectTimeout, connectionString, exchange: {prefix, type: exchangeType}, queueName} = config.messageQueue

const onConsume = (expectedMessages, channel, onMessage) => async message => {
    const {fields: {exchange}, properties: {correlationId, replyTo}, content} = message

    logger.silly(`consumed message from ${exchange}`)

    const messageTypeName = exchange.substring(exchange.startsWith(prefix) ? prefix.length : 0)

    const messageType = expectedMessages[messageTypeName]

    if (!messageType) {
        logger.warn(`Unexpected message of type ${messageTypeName} received. The service only accepts messages of types `, Object.keys(expectedMessages))

        return
    }

    const deserializedMessage = messageType.decode(content)

    const object = deserializedMessage.toJSON()

    const result = await onMessage(messageTypeName, object)

    if (correlationId && replyTo) {
        const {type, response} = result

        const encoded = type.encode(response).finish()

        channel.publish('', replyTo, encoded, {correlationId})
    }
}

const startService = async (expectedMessages, onMessage) => {

    const restoreOnFailure = e => {
        logger.warn('connection with message bus lost due to error', e)
        logger.info(`reconnecting in ${reconnectTimeout} milliseconds`)

        setTimeout(() => startService(expectedMessages, onMessage), reconnectTimeout)
    }

    const exchanges = Object.keys(expectedMessages).map(m => `${prefix}${m}`)

    try {
        const connection = await amqplib.connect(connectionString)

        connection.on('error', restoreOnFailure)

        const channel = await connection.createChannel()

        const handleConsume = onConsume(expectedMessages, channel, onMessage)

        const queue = await channel.assertQueue(queueName)

        exchanges.forEach(exchange => {
            channel.assertExchange(exchange, exchangeType, {durable: true})

            channel.bindQueue(queue.queue, exchange, '')
        })

        logger.debug(`start listening messages from ${exchanges.join(', ')}`)

        channel.consume(queue.queue, handleConsume, {noAck: true})
    }
    catch (e) {
        logger.warn('error while subscribing for messages message', e)

        restoreOnFailure(e)
    }
}

export default startService
1
  • You can use a scheduler to schedule sending message after 2 weeks by using node-schedule Commented Apr 2, 2018 at 12:42

1 Answer 1

4

RabbitMQ has a plug-in for scheduling messages. You can use it, subject to an important design caveat which I explain below.

Use Steps

You must first install it:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Then, you have to set up a delayed exchange:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);

Finally, you can set the x-delay parameter (where delay is in milliseconds).

byte[] messageBodyBytes = "delayed payload".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

Two weeks is equal to (7*24*60*60*1000 = 604,800,000) milliseconds.

Important Caveat As I explained in this answer, this is a really bad thing to ask the message broker to do.

It's important to keep in mind, when dealing with message queues, they perform a very specific function in a system: to hold messages while the processor(s) are busy processing earlier messages. It is expected that a properly-functioning message queue will deliver messages as soon as reasonable. Basically, the fundamental expectation is that as soon as a message reaches the head of the queue, the next pull on the queue will yield the message -- no delay.

Delay becomes a result of how a system with a queue processes messages. In fact, Little's Law offers some interesting insights into this. If you're going to stick an arbitrary delay in there, you really have no need of a message queue to begin with - all your work is scheduled up front.

So, in a system where a delay is necessary (for example, to join/wait for a parallel operation to complete), you should be looking at other methods. Typically a queryable database would make sense in this particular instance. If you find yourself keeping messages in a queue for a pre-set period of time, you're actually using the message queue as a database - a function it was not designed to provide. Not only is this risky, but it also has a high likelihood of hurting the performance of your message broker.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.