2

I am trying to receive and send simple messages from two web APIs using RabbitMQ. It is a pretty simple code for now, and I am trying to see if both the APIs are properly able to communicate with each other. The issue is that I am not receiving all the messages and unable to establish a pattern between those that I am losing and those I am receiving. Below is the sample code.

For sending messages

public class QueueController : Controller
    {
        [HttpGet]
        [Route("send")]
        public async Task<IActionResult> Send()
        {
            QueueManager.Send();
            return Ok();
        }
   }

public class QueueManager
    {
        public static string queueName = "test-queue";
        public static int count = 0;
        public static void Send()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            Using (var channel = connection.CreateModel())
            {
            var queue = channel.QueueDeclare(queueName,
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
            count++;
            var message = new { Message = "Sent Message", count = count };
            var body = JsonSerializer.Serialize(message);
            var queueMessage = Encoding.UTF8.GetBytes(body);
            channel.BasicPublish("", queueName, null, queueMessage);
            }
        }
    }

For receiving messages

public class QueueController : Controller
    {
        [HttpGet]
        [Route("receive")]
        public async Task<IActionResult> Receive()
        {
            QueueManager.Receive();
            return Ok();
        }
    }

public class QueueManager
    {
        public static string queueName = "test-queue";
        public static void Receive()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queueName,
                                         durable: false,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var msg = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(msg);
                    Console.WriteLine(message);
                };
                channel.BasicConsume(queueName, true, consumer);
            }
        }
    }

When I check the console of my receiver API, the message count is random. E.g., when I sent 7 messages, the ones that I received were of the number 2,3, and 7. So I lost 4 of the 7 messages. Not sure what is wrong here. Also, when I check the management console, I can see that the queue is emptied only when I call the endpoint in the receiver API, however the message still does not appear in the console. Any help will be appreciated.

1
  • Please note that these are not the issues when I am using a console application to receive the messages, only when I am making an API call to read the queue. Commented Jun 15, 2021 at 9:38

1 Answer 1

1

I create two solutions and used your code inside it, but it does not work. I changed it like this.

your producer class :

public class QueueManager2
{
    public static string queueName = "test-queue";
    public static int count = 0;
    public static void Send()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: queueName,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            count++;
            var message = new { Message = "Sent Message", count = count };

            var body = JsonSerializer.Serialize(message);
            var queueMessage = Encoding.UTF8.GetBytes(body);
             
            channel.BasicPublish(exchange: "",
                                 routingKey: queueName,
                                 basicProperties: null,
                                 body: queueMessage);
        }
    }
}

and your consumer class:

public class QueueManager2
{
    public static string queueName = "test-queue";
    public static void Receive()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
         var rabbitMqConnection = factory.CreateConnection();
        var rabbitMqChannel = rabbitMqConnection.CreateModel();

        rabbitMqChannel.QueueDeclare(queue: queueName,
                             durable: false,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);

        rabbitMqChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

        int messageCount = Convert.ToInt16(rabbitMqChannel.MessageCount(queueName));            

        var consumer = new EventingBasicConsumer(rabbitMqChannel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body.ToArray());                
            rabbitMqChannel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            
        };
        rabbitMqChannel.BasicConsume(queue: queueName,
                             autoAck: false,
                             consumer: consumer); 
    }
} 

Now, If you run your project, you can produce a message and consume it. actually, for consumer, it's better to have a hosted service which is always in running mode. having an endpoint for getting messages is not a good idea

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks a lot. I get your point regarding hosted service, what I am trying to do is consume the queue on demand, however it seems like letting the messages stay in queue until I actually want them may not be the best idea. I see that you checked for the message count but never really used it, any reason for that or is it something you forgot to remove while posting the solution?
Yes, unfortunately, I forgot to mention the message count . and I think it's not a good idea to use message queue as persistence.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.