I have implemented RabbitMQ in my CodeIgniter project to handle a file upload system. Users can upload multiple files at once. After uploading the files, the IDs of the uploaded files are added to a RabbitMQ queue, and then RabbitMQ processes these files in the background. In the background process, various tasks like OCR scanning are performed.
To handle the background processing, I created a worker via the terminal with the following:
php index.php JobProcessing/process_progress_rabbitmq
This works fine when a single user is uploading files. However, when there are multiple users (e.g., 50 users at once), each user has to wait for the previous user's tasks to complete. I want to implement parallel processing so that users don't have to wait for their turn.
One solution I considered is creating multiple workers, but I'm concerned about the limit on the number of workers that can be created. If there are, say, 500 users uploading at the same time, how many workers can I safely create? Is there a recommended approach for handling this scenario, especially considering system resources and RabbitMQ limits?
Any insights or recommendations would be greatly appreciated!
below is my code:
<?php
defined('BASEPATH') or exit('No direct script access allowed');
require_once(APPPATH . 'third_party/Rabbit_mq/vendor/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Spatie\Async\Pool;
class Rabbit_mq
{
protected $ci, $connection, $channel, $pool;
public function __construct()
{
try {
$this->ci = &get_instance();
$this->ci->load->model('invoice_scan_model');
$this->ci->load->library('scan_invoice_lib');
$this->ci->load->library('quick_books');
$this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();
} catch (Exception $e) {
echo "RabbitMQ Connection Error: " . $e->getMessage() . "\n";
}
}
function addToQueue($data)
{
$this->channel->queue_declare('file_processing_new', false, true, false, false);
$msg = new AMQPMessage(json_encode($data));
$this->channel->basic_publish($msg, '', 'file_processing_new');
}
public function processQueue()
{
$this->channel->queue_declare('file_processing_new', false, true, false, false);
// Set prefetch count to allow multiple messages to be handled concurrently
$this->channel->basic_qos(null, 15, null);
$callback = function ($msg) {
$data = json_decode($msg->body, true);
$fileId = $data['file_id'];
$filePath = $data['file_path'];
try {
echo "Received message for file ID: $fileId\n";
// Initialize the async pool
$this->pool = Pool::create();
// Add task to the pool for parallel processing
$this->pool->add(function () use ($fileId, $filePath) {
try {
echo "Processing file ID: $fileId\n";
$data = array(
'progress_status' => 'Processing',
'progress_percentage' => 50,
);
$this->ci->invoice_scan_model->update_data(['id' => $fileId], 'tblapi_save_invoice_file', $data);
echo "File ID: $fileId marked as Processing.\n";
// Simulate file processing (OCR or other logic here)
$this->process_invoice($fileId);
// Update status to 'Completed' after processing
$data = array(
'progress_status' => 'Completed',
'progress_percentage' => 100,
);
$this->ci->invoice_scan_model->update_data(['id' => $fileId], 'tblapi_save_invoice_file', $data);
echo "File ID: $fileId processing completed.\n";
} catch (Exception $e) {
echo "Error processing file ID: $fileId - " . $e->getMessage() . "\n";
}
});
// Wait for all tasks to finish
$this->pool->wait(); // This will wait until all tasks are done
// Acknowledge the message after all tasks are processed
$this->channel->basic_ack($msg->delivery_info['delivery_tag']);
echo "Acknowledging message for file ID: $fileId\n";
} catch (Exception $e) {
echo "Error processing file ID: $fileId - " . $e->getMessage() . "\n";
$this->channel->basic_nack($msg->delivery_info['delivery_tag']);
}
};
// Multiple consumers (workers) consuming the messages
for ($i = 0; $i < 25; $i++) {
$this->channel->basic_consume('file_processing_new', '', false, false, false, false, $callback);
}
echo "Waiting for messages. To exit press CTRL+C\n";
// Consume messages concurrently by multiple workers
while ($this->channel->callbacks) {
$this->channel->wait();
}
// Close the channel and connection when done
$this->channel->close();
$this->connection->close();
}
}