0

I have implemented RabbitMQ in my CodeIgniter project to handle a file upload system. Users can upload multiple files at once. After uploading the files, the IDs of the uploaded files are added to a RabbitMQ queue, and then RabbitMQ processes these files in the background. In the background process, various tasks like OCR scanning are performed.

To handle the background processing, I created a worker via the terminal with the following:

php index.php JobProcessing/process_progress_rabbitmq

This works fine when a single user is uploading files. However, when there are multiple users (e.g., 50 users at once), each user has to wait for the previous user's tasks to complete. I want to implement parallel processing so that users don't have to wait for their turn.

One solution I considered is creating multiple workers, but I'm concerned about the limit on the number of workers that can be created. If there are, say, 500 users uploading at the same time, how many workers can I safely create? Is there a recommended approach for handling this scenario, especially considering system resources and RabbitMQ limits?

Any insights or recommendations would be greatly appreciated!

below is my code:

<?php

defined('BASEPATH') or exit('No direct script access allowed');
require_once(APPPATH . 'third_party/Rabbit_mq/vendor/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Spatie\Async\Pool;

class Rabbit_mq
{
    protected $ci, $connection, $channel, $pool;

    public function __construct()
    {
        try {
            $this->ci = &get_instance();
            $this->ci->load->model('invoice_scan_model');
            $this->ci->load->library('scan_invoice_lib');
            $this->ci->load->library('quick_books');
            $this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
            $this->channel = $this->connection->channel();
        } catch (Exception $e) {
            echo "RabbitMQ Connection Error: " . $e->getMessage() . "\n";
        }
    }

    function addToQueue($data)
    {
        $this->channel->queue_declare('file_processing_new', false, true, false, false);

        $msg = new AMQPMessage(json_encode($data));
        $this->channel->basic_publish($msg, '', 'file_processing_new');
    }

    public function processQueue()
    {
        $this->channel->queue_declare('file_processing_new', false, true, false, false);

        // Set prefetch count to allow multiple messages to be handled concurrently
        $this->channel->basic_qos(null, 15, null);

        $callback = function ($msg) {
            $data = json_decode($msg->body, true);
            $fileId = $data['file_id'];
            $filePath = $data['file_path'];

            try {
                echo "Received message for file ID: $fileId\n";

                // Initialize the async pool
                $this->pool = Pool::create();

                // Add task to the pool for parallel processing
                $this->pool->add(function () use ($fileId, $filePath) {
                    try {
                        echo "Processing file ID: $fileId\n";
                        $data = array(
                            'progress_status' => 'Processing',
                            'progress_percentage' => 50,
                        );
                        $this->ci->invoice_scan_model->update_data(['id' => $fileId], 'tblapi_save_invoice_file', $data);
                        echo "File ID: $fileId marked as Processing.\n";

                        // Simulate file processing (OCR or other logic here)
                        $this->process_invoice($fileId);

                        // Update status to 'Completed' after processing
                        $data = array(
                            'progress_status' => 'Completed',
                            'progress_percentage' => 100,
                        );
                        $this->ci->invoice_scan_model->update_data(['id' => $fileId], 'tblapi_save_invoice_file', $data);
                        echo "File ID: $fileId processing completed.\n";
                    } catch (Exception $e) {
                        echo "Error processing file ID: $fileId - " . $e->getMessage() . "\n";
                    }
                });

                // Wait for all tasks to finish
                $this->pool->wait(); // This will wait until all tasks are done

                // Acknowledge the message after all tasks are processed
                $this->channel->basic_ack($msg->delivery_info['delivery_tag']);
                echo "Acknowledging message for file ID: $fileId\n";
            } catch (Exception $e) {
                echo "Error processing file ID: $fileId - " . $e->getMessage() . "\n";
                $this->channel->basic_nack($msg->delivery_info['delivery_tag']);
            }
        };

        // Multiple consumers (workers) consuming the messages
        for ($i = 0; $i < 25; $i++) {
            $this->channel->basic_consume('file_processing_new', '', false, false, false, false, $callback);
        }

        echo "Waiting for messages. To exit press CTRL+C\n";

        // Consume messages concurrently by multiple workers
        while ($this->channel->callbacks) {
            $this->channel->wait();
        }

        // Close the channel and connection when done
        $this->channel->close();
        $this->connection->close();
    }
}
3
  • 1
    There's no strict limit on how many workers you can run, but you should align concurrency with the CPU, memory, and I/O capacity of your environment. You can start with a modest number of workers, monitor system performance, and gradually increase if you have enough headroom. Prefetch settings in RabbitMQ and container orchestration strategies can help you manage and scale parallel processing more effectively. Commented Jan 31 at 13:50
  • ok @KamyarSafari, but my problem is same. If I create 20 workers and there are 500 users at the same time. Then the last 500th user have to wait. This many take so much time. Commented Jan 31 at 15:09
  • 1
    Even with more workers, there's a hardware limit on how many tasks can run at once. If 20 workers can’t handle 500 users, you might need to spread the load across multiple servers or containers and scale up resources. Monitor performance closely to keep queues moving and reduce wait times. I don't think there is any other way. Commented Jan 31 at 15:45

1 Answer 1

1

First of all you have to find out, how many workers you can safely create. That is a load test. You do that on a system that is identical so that you don't kill your production box.

Let's say you find out that one system can handle 50 users at once.

Then you formulate the requirement how many users at once must always be served. Let's say 500.

Then you fire up 10 worker instances of that type and you keep them always running per your requirement (10 x 50 = 500).

While doing so, for the number of users that you actually have currently, let's say it is 250 right now, you spawn an additional five workers so that you always have the ability to treat 500 users on top at once (15 x 50 = 750).

And then, if you have 100 users then right now, you can reduce by 3 machines (12 x 50 = 600).

And so on, and so forth.

Always keep enough capacity on spare ahead, then constantly add and destroy machines so you get accustomed to it and you will always have enough working power available for your users.

This calculation is not entirely correct, but is a good training ground you can run the first weeks with and gather the necessary metrics you need to have to plan ahead.

Sign up to request clarification or add additional context in comments.

2 Comments

Is this like "dynamic scaling strategy"?
Means, my code is in good stage, and I don't have to do anything from my code section, and everything will be done by the server team and marketing team, correct?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.