0

I'm currently implementing a wallet top-up system for an application. I'm using a job called AdjustWalletBalance to handle this. What the job simply does is to read the client's current wallet balance and increase it by a certain amount. Below is the code for the job:

use App\Events\Tenant\WalletTopupSuccessful;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Queue\SerializesModels;

class AdjustWalletBalance implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     */
    public function __construct(
        /**
         * The amount to add to wallet balance. Can be negative for deductions.
         */
        public float $amount,
        /**
         * User who's wallet should be adjusted.
         */
        public User $user,
    )
    {
        $this->onQueue('wallets');
    }

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        $process_id = mt_rand();
        $wallet = $user->wallet;
        \Log::debug("starting $process_id at " . now());
        \Log::debug("adjusting wallet balance with amount: {$this->amount}");
        \Log::debug("current balance for $process_id: {$wallet->balance}");

        // Increasing the running time for the job to make it easier to have 
        // two simultaneous jobs running for the sake of this demonstration.
        sleep(10);

        $wallet->balance = $wallet->balance + $this->amount;
        $wallet->save();
        \Log::debug("$process_id updated balance to {$wallet->balance}");
    }

    public function middleware()
    {
        return [(new WithoutOverlapping($this->user->id))];
    }

}

Because handling a user's wallet balance is a very sensitive task, I don't want multiple instances of the job to be run for a particular user, leading to race conditions and incorrect updating of balance, so I added the WithoutOverlapping middleware to the job using the user's id as a unique key. But it doesn't work, I can still execute the job in parallel for the exact same user even though I added the middleware.

I have a route entry like this:

Route::get('/queue-test', function() {
    dispatch(new \App\Jobs\Tenant\AdjustWalletBalance(50, User::first());
});

I'm running two queue workers simultaneously, and if I visit the route twice, I can immediately see that each worker starts processing an AdjustWalletBalance job, which should not be: Picture of split terminals showing that the two jobs run concurrently

. And when I check my log files to further verify the sequence of execution of the jobs, I see this:

[2023-09-24 19:12:42] local.DEBUG: starting 1007152283 at: 2023-09-24 19:12:42  
[2023-09-24 19:12:42] local.DEBUG: adjusting wallet balance with amount: 50  
[2023-09-24 19:12:42] local.DEBUG: current balance for 1007152283: 0  
[2023-09-24 19:12:43] local.DEBUG: starting 241490440 at: 2023-09-24 19:12:43
[2023-09-24 19:12:43] local.DEBUG: adjusting wallet balance with amount: 50  
[2023-09-24 19:12:43] local.DEBUG: current balance for 241490440: 0  
[2023-09-24 19:12:52] local.DEBUG: 1007152283 updated balance to 50  
[2023-09-24 19:12:53] local.DEBUG: 241490440 updated balance to 50  

As you might already have noticed, this is a HUGE problem. The user's wallet was credited twice with #50, but because the two jobs are running concurrently, the user's wallet balance gets updated to #50 instead of #100!

I've already set the CACHE_DRIVER to array, so I don't think it's a configuration issue.

I also tried using the ShouldBeUnique contract and setting the user's id as the uniqueId like so

class AdjustWalletBalance implements ShouldQueue, ShouldBeUnique
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public function __construct(/**/)
    { /* Code */ }

    public function uniqueId(): int
    {
        return $this->user->id;
    }

    public function handle()
    { /* Code */ }
}

But I still get the exact same results.

If I kill the other queue worker and leave only one running, then subsequent instances of the job are added to the queue and the user's wallet is updated correctly. But there will be several hundreds of users for the application and I need to have multiple queue workers running simultaneously to quickly handle their wallet updates, as such, a single queue worker is not an option.

6
  • I would not expect the cache driver set to array to work. Is using Redis an option? Commented Sep 24, 2023 at 21:28
  • Have you tried to use AdjustWalletBalance::dispatch(50, User::first()) instead of dispatch(new AdjustWalletBalance(...))? Commented Sep 24, 2023 at 21:37
  • @apokryfos Laravel's official doc states this about Atomic Locks (used by WithoutOverlapping): "To utilize this feature, your application must be using the memcached, redis, dynamodb, database, file, or array cache driver as your application's default cache driver. In addition, all servers must be communicating with the same central cache server.". So, array seems to be a valid option (also if it suggest to use it for automated tests, so it's not totally clear) Commented Sep 24, 2023 at 21:51
  • @AlbertoFecchi I hadn't tested it by calling dispatch as a static method before, but after reading your comment I tried it and it still gave the same results. I also tried using the database cache driver before posting the question as I guessed it might be a problem with the array driver; I ran the necessary migrations to create the cache and cache_locks table, then tested my code again. It didn't make any difference, so I'm almost certain it's not about the cache driver. Commented Sep 24, 2023 at 23:39
  • 1
    Aside from WithoutOverLapping, check also pessimistic locking: laravel.com/docs/10.x/queries#pessimistic-locking. Though this is for DB updates rather than making sure that queue does not run parallel. Commented Sep 25, 2023 at 0:19

1 Answer 1

0

I edited my code to this;

class AdjustWalletBalance extends ShouldQueue {

    /* Skipped unchanged parts of the code */

    public ?string $process_id;

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        $process_id = mt_rand();
        $this->process_id = $process_id;

        // NOTE: Perform any queries relating to the row you wish to lock before you start
        // the transaction. Once the transaction begins, the same row will keep being 
        // returned regardless of any changes that occurred outside the transaction
        $wallet_id = $this->user->wallet->id;

        // P.S use DB::transaction() instead, S.O's syntax highlighting went 
        // wonky when I used it so I changed to beginTransaction instead
        \DB::beginTransaction();
            // Obtain lock on the current user's wallet. Now only our transaction can modify it.
            \DB::statement("select * from wallets where id = $wallet_id for update;");

            $wallet = $this->user->wallet;
            \Log::debug("starting $process_id at: " . now());
            \Log::debug("adjusting wallet balance with amount: {$this->amount}");
            \Log::debug("current balance for $process_id: {$wallet->balance}");
            sleep(10);

            $wallet->balance += $this->amount;
            $wallet->save();
        \DB::commit();
        $wallet = $this->user->wallet;
        \Log::debug("$process_id updated balance to {$wallet->balance}");
    }

    public function fail(\Throwable $th)
    {
        \DB::rollback(); // Remove this line if you're using DB::transaction
        \Log::debug("{$this->process_id} failed with exception:\n" . $th->getMessage());
    }

}

Now checking my log files gives me:

[2023-09-25 01:31:53] local.DEBUG: starting 878985182 at: 2023-09-25 01:31:53  
[2023-09-25 01:31:53] local.DEBUG: adjusting wallet balance with amount: 50  
[2023-09-25 01:31:53] local.DEBUG: current balance for 878985182: 0  
[2023-09-25 01:32:03] local.DEBUG: starting 2126825516 at: 2023-09-25 01:32:03  
[2023-09-25 01:32:03] local.DEBUG: adjusting wallet balance with amount: 50  
[2023-09-25 01:32:03] local.DEBUG: current balance for 2126825516: 50  
[2023-09-25 01:32:03] local.DEBUG: 878985182 updated balance to 50  
[2023-09-25 01:32:13] local.DEBUG: 2126825516 updated balance to 100

Success! Checking my database also verifies that the user's wallet was updated correctly.

And a screenshot of the queue workers shows that the workers no longer take exactly 10 seconds each:

Photo of my terminal showing that the processes do not execute in parallel

This means that MySQL does not allow the other worker to obtain a lock on the row of the user's wallet until the other transaction is completed, thus eliminating parallel wallet updates.

Resources that led me to this solution:

Sign up to request clarification or add additional context in comments.

1 Comment

You can also use pessimistic locking in the query builder

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.