0

I am facing a problem that would like help. I am developing a background process that will be listening to a queue in rabbitmq server. It is OK if I run it in a .net core console application. However I would like to do it in a more elegant way such as web service (which has given me a lot of trouble where it does not work when installed) or an IIS hosted web application. I face a problem of Scoped Service when I try to host the service (IHostedService) in .net core web application.

The code below is working fine in a console application. How can make it run as an IHostedService in a .net core web application. What am I supposed to change. Your help is appreciated. CODE:

using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using PaymentProcessor.Models;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Microsoft.EntityFrameworkCore;

namespace PaymentProcessor
{
    public class PaymentProcessingService : HostedService
    {
        IConfiguration configuration;

        private EntitiesContext claimsContext;
        private string connectionString;

        private string HostName = "";
        private string UserName = "";
        private string Password = "";

        private static int MaxRetries;

        private IConnectionFactory factory;
        private IConnection connection;
        private IModel channel;

        public PaymentProcessingService(IConfiguration configuration)
        {
            this.configuration = configuration;

            this.connectionString = configuration.GetConnectionString ("StagingContext");
            claimsContext = new EntitiesContext(connectionString);

            HostName = this.configuration.GetValue<string>("Settings:HostName");
            UserName = this.configuration.GetValue<string>("Settings:UserName");
            Password = this.configuration.GetValue<string>("Settings:Password");
            MaxRetries = this.configuration.GetValue<string>("Settings:MaxRetries").ConvertTo<int>(); 
        }

        protected override async Task ExecuteAsync(CancellationToken cancellationToken)
        {

            connect:
            factory = new ConnectionFactory { HostName = HostName, UserName = UserName, Password = Password };

            try
            {
                connection = factory.CreateConnection();
                channel = connection.CreateModel();

                channel.ExchangeDeclare("payment_rocessing_exchange", "topic");
                channel.QueueDeclare("payment_processing_queue", true, false, false, null);
                channel.QueueBind("payment_processing_queue", "payment_processing_exchange", "processing");


                var queueArgs = new Dictionary<string, object>
                    {
                        { "x-dead-letter-exchange", "payment_processing_exchange" },
                        {"x-dead-letter-routing-key", "processing_retry"},
                        { "x-message-ttl", 10000 }
                    };

                channel.ExchangeDeclare("payment_rocessing_exchange", "topic");
                channel.QueueDeclare("payment_processing_retry_queue", true, false, false, queueArgs);
                channel.QueueBind("payment_processing_retry_queue", "payment_processing_exchange", "processing_retry", null);

                channel.ExchangeDeclare("payment_processing_exchange", "topic");
                channel.QueueDeclare("payment_processing_error_queue", true, false, false, null);
                channel.QueueBind("payment_processing_error_queue", "payment_processing_exchange", "processing_error", null);

                channel.ExchangeDeclare("payment_processing_exchange", "topic");
                channel.QueueDeclare("payment_integration_queue", true, false, false, null);
                channel.QueueBind("payment_integration_queue", "payment_processing_exchange", "integration", null);

                channel.BasicQos(0, 1, false);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {


                    var message = ea.Body.DeSerializeText();
                    try
                    {
                        var saveBundle = JObject.Parse(message);
                        var msg = (dynamic)((dynamic)saveBundle).Message;
                        string referenceNo = (string)msg.ReferenceNo;

                        var parameters = new[]
                        {
                           new SqlParameter
                            {
                                DbType =  DbType.String,
                                ParameterName = "ReferenceNo",
                                Value =referenceNo
                            }
                        };

                        var result = claimsContext.Database.ExecuteSqlCommand("dbo.PaymentReferencesProcessSingle @ReferenceNo", parameters);

                        IBasicProperties props = channel.CreateBasicProperties();
                        props.Persistent = true;
                        props.ContentType = "text/plain";
                        props.DeliveryMode = 2;

                        channel.BasicPublish("payment_processing_exchange", "integration", props, (new MessageEnvelope { RetryCounts = 0, Message = JObject.FromObject(new { ReferenceNo = referenceNo }) }).Serialize()); 



                    }
                    catch (Exception ex)
                    {

                        MessageEnvelope envelope = JsonConvert.DeserializeObject<MessageEnvelope>(message);

                        if (envelope.RetryCounts < MaxRetries)
                        {
                            int RetryCounts = envelope.RetryCounts + 1;
                            MessageEnvelope messageEnvelope = new MessageEnvelope { RetryCounts = RetryCounts, Message = envelope.Message };
                            var data = messageEnvelope.Serialize();
                            channel.BasicPublish("payment_processing_exchange", "processing_retry", null, data);

                        }
                        else
                        {
                            var data = envelope.Serialize();
                            channel.BasicPublish("payment_processing_exchange", "processing_error", null, data);
                        }

                    }
                    finally
                    {
                        channel.BasicAck(ea.DeliveryTag, false);
                    }

                };

                channel.BasicConsume(queue: "payment_processing_queue", autoAck: false, consumer: consumer);

            }
            catch (Exception ex)
            {

                Thread.Sleep(10000);
                goto connect;
            }

        }
    }
}

and then

services.AddScoped<IHostedService, PaymentProcessingService>();
2
  • Not really sure what the problem is. Things look generally correct. Have a look here if you want to troubleshoot.learn.microsoft.com/en-us/aspnet/core/fundamentals/host/… Commented Jun 18, 2018 at 7:04
  • It would be more helpful (and more likely to get an answer) if you provide the actual error/exception you're getting, or at least describe the behavior you're characterizing as an error. And if you resolved this issue, would you kindly answer your own post with a simple detail of the resolution? Commented Sep 12, 2018 at 14:06

2 Answers 2

4

As Dekim mentioned a service should be registered.

Please have a look on an example I created on GitHub.

Program.cs looks like this:

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.Threading.Tasks;

namespace Core
{
    internal class Program
    {
        static async Task Main(string[] args)
        {

            await new HostBuilder()
                 .ConfigureServices((hostContext, services) =>
                 {
                     services.AddHostedService<ServiceRabbitMQ>(); // register our service here            
                 })
                .RunConsoleAsync();
        }
    }
}
Sign up to request clarification or add additional context in comments.

Comments

1

Because the IHostedService requires the creation of a special scope according to the documentation.

From the Microsoft documentation cited in the above answer :

No scope is created for a hosted service by default.

Consider using : services.AddHostedService<MyHostedService>();

1 Comment

Welcome to StackOverflow, it would be great if you could add a bit more detail of why IHostedService is a "special class" and why it needs to be injected this way. That way the person asking the question might understand the reason a bit better.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.