I'm currently implementing a wallet top-up system for an application. I'm using a job called AdjustWalletBalance to handle this. What the job simply does is to read the client's current wallet balance and increase it by a certain amount. Below is the code for the job:
use App\Events\Tenant\WalletTopupSuccessful;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Queue\SerializesModels;
class AdjustWalletBalance implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Create a new job instance.
*/
public function __construct(
/**
* The amount to add to wallet balance. Can be negative for deductions.
*/
public float $amount,
/**
* User who's wallet should be adjusted.
*/
public User $user,
)
{
$this->onQueue('wallets');
}
/**
* Execute the job.
*/
public function handle(): void
{
$process_id = mt_rand();
$wallet = $user->wallet;
\Log::debug("starting $process_id at " . now());
\Log::debug("adjusting wallet balance with amount: {$this->amount}");
\Log::debug("current balance for $process_id: {$wallet->balance}");
// Increasing the running time for the job to make it easier to have
// two simultaneous jobs running for the sake of this demonstration.
sleep(10);
$wallet->balance = $wallet->balance + $this->amount;
$wallet->save();
\Log::debug("$process_id updated balance to {$wallet->balance}");
}
public function middleware()
{
return [(new WithoutOverlapping($this->user->id))];
}
}
Because handling a user's wallet balance is a very sensitive task, I don't want multiple instances of the job to be run for a particular user, leading to race conditions and incorrect updating of balance, so I added the WithoutOverlapping middleware to the job using the user's id as a unique key. But it doesn't work, I can still execute the job in parallel for the exact same user even though I added the middleware.
I have a route entry like this:
Route::get('/queue-test', function() {
dispatch(new \App\Jobs\Tenant\AdjustWalletBalance(50, User::first());
});
I'm running two queue workers simultaneously, and if I visit the route twice, I can immediately see that each worker starts processing an AdjustWalletBalance job, which should not be:

. And when I check my log files to further verify the sequence of execution of the jobs, I see this:
[2023-09-24 19:12:42] local.DEBUG: starting 1007152283 at: 2023-09-24 19:12:42
[2023-09-24 19:12:42] local.DEBUG: adjusting wallet balance with amount: 50
[2023-09-24 19:12:42] local.DEBUG: current balance for 1007152283: 0
[2023-09-24 19:12:43] local.DEBUG: starting 241490440 at: 2023-09-24 19:12:43
[2023-09-24 19:12:43] local.DEBUG: adjusting wallet balance with amount: 50
[2023-09-24 19:12:43] local.DEBUG: current balance for 241490440: 0
[2023-09-24 19:12:52] local.DEBUG: 1007152283 updated balance to 50
[2023-09-24 19:12:53] local.DEBUG: 241490440 updated balance to 50
As you might already have noticed, this is a HUGE problem. The user's wallet was credited twice with #50, but because the two jobs are running concurrently, the user's wallet balance gets updated to #50 instead of #100!
I've already set the CACHE_DRIVER to array, so I don't think it's a configuration issue.
I also tried using the ShouldBeUnique contract and setting the user's id as the uniqueId like so
class AdjustWalletBalance implements ShouldQueue, ShouldBeUnique
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct(/**/)
{ /* Code */ }
public function uniqueId(): int
{
return $this->user->id;
}
public function handle()
{ /* Code */ }
}
But I still get the exact same results.
If I kill the other queue worker and leave only one running, then subsequent instances of the job are added to the queue and the user's wallet is updated correctly. But there will be several hundreds of users for the application and I need to have multiple queue workers running simultaneously to quickly handle their wallet updates, as such, a single queue worker is not an option.

AdjustWalletBalance::dispatch(50, User::first())instead ofdispatch(new AdjustWalletBalance(...))?WithoutOverlapping): "To utilize this feature, your application must be using the memcached, redis, dynamodb, database, file, or array cache driver as your application's default cache driver. In addition, all servers must be communicating with the same central cache server.". So,arrayseems to be a valid option (also if it suggest to use it for automated tests, so it's not totally clear)dispatchas a static method before, but after reading your comment I tried it and it still gave the same results. I also tried using thedatabasecache driver before posting the question as I guessed it might be a problem with thearraydriver; I ran the necessary migrations to create thecacheandcache_lockstable, then tested my code again. It didn't make any difference, so I'm almost certain it's not about the cache driver.WithoutOverLapping, check also pessimistic locking: laravel.com/docs/10.x/queries#pessimistic-locking. Though this is for DB updates rather than making sure that queue does not run parallel.