2

I am writing code for a wallet system. At some point we receive a webhook from our payment provider notifying us of a successful deposit. The event is saved & stored and queued as a background job using Hangfire.

The function to process this event is to find a transaction with a matching reference and amount, validate the transaction & credit the user's wallet. I am having problems managing concurrency due to the way I designed the system.

The aim was to use C#'s concurrent dictionary and lock statement to lock on a transaction reference (and wallet ids) allowing us to process multiple transactions at the same time or manipulate different wallets at the same time.

I wrote a test to check that only 1 thread would be allowed to access this function in the scenario that the job is queued multiple times. The test looks like this:

It looks like this:

public async Task CompleteWalletDepositAsync_ConcurrentCalls_ProcessesOnlyOnce()
        {
            // Arrange
            var transactionReference = "TEST_REF_001";
            var description = "test";
            var userId = Guid.NewGuid().ToString();

            var userWallet = new WalletBuilder().WithOwnerId(userId).WithOwnerType(OwnerType.User).WithBalance(0).Build();
            var walletId = await CreateWalletAsync(userWallet);

            var ledgerWallet = await unitOfWork.Wallets.GetLedgerWalletAsync(TransactionType.Deposit);

            var creditTransaction = new WalletTransactionBuilder(_fakeTimeProvider.GetUtcNow().UtcDateTime)
                .WithTransactionReference(transactionReference)
                .WithWalletId(walletId)
                .WithAmount(1000)
                .WithDescription(description)
                .WithCurrencyIsoCode(Currencies.NGN.ToString())
                .WithProcessor(Processor.Paystack.ToString())
                .WithWalletOwnerId(userId)
                .WithWalletOwnerType(OwnerType.User)
                .WithTransactionType(TransactionType.Deposit)
                .WithTransactionStatus(WalletTransactionStatus.Pending)
                .WithEntry(TransactionEntry.Cr)
                .Build();
            await CreateWalletTransactionAsync(creditTransaction);

            var debitTransaction = new WalletTransactionBuilder(_fakeTimeProvider.GetUtcNow().UtcDateTime)
                .WithTransactionReference(transactionReference)
                .WithWalletId(ledgerWallet!.Id)
                .WithAmount(1000)
                .WithDescription(description)
                .WithCurrencyIsoCode(Currencies.NGN.ToString())
                .WithProcessor("Processor")
                .WithWalletOwnerId(ledgerWallet.OwnerId)
                .WithWalletOwnerType(ledgerWallet.OwnerType)
                .WithTransactionType(TransactionType.Deposit)
                .WithTransactionStatus(WalletTransactionStatus.Pending)
                .WithEntry(TransactionEntry.Dr)
                .Build();
            await CreateWalletTransactionAsync(debitTransaction);

            var webhookEvent = new WebhookEvent
            {
                Reference = transactionReference,
                Event = Events.CHARGE_SUCCESS,
                Data = "{\"amount\":100000,\"paid_at\":\"2024-08-09T12:00:00Z\"}",
                Processor = "Processor"
            };
            await CreateWebhookEventAsync(webhookEvent);

            // Act
            var tasks = new List<Task>();
            for (int i = 0; i < 5; i++)
            {
                tasks.Add(Task.Run(() => transactionService.CompleteWalletDepositAsync(transactionReference)));
            }

            await Task.WhenAll(tasks);

            // Assert
            var updatedWallet = await unitOfWork.Wallets.GetByIdAsync(walletId);
            updatedWallet.Should().NotBeNull();
            updatedWallet!.Balance.Should().Be(1000, "The wallet should be credited only once");

            var updatedTransactions = await unitOfWork.WalletTransactions.GetFilteredTransactionsAsync(t =>
                t.TransactionReference == transactionReference
            );
            updatedTransactions.Should().HaveCount(2, "There should still be only two transactions");
            updatedTransactions.Should().AllSatisfy(t => t.TransactionStatus.Should().Be(WalletTransactionStatus.Successful));
}

When I run the test, all threads process at the same time and the wallet balance ends up as 5 times what it ought to be. The service ought to process 5 times, with only the first succeeding, as the transaction would have been marked as completed afterwards.

Output:

info: LimestoneFinancialService.Infrastructure.Services.TransactionService[0]
      Processing wallet deposit with reference: TEST_REF_001. Time: 08/09/2024 12:15:21
info: LimestoneFinancialService.Infrastructure.Services.TransactionService[0]
      Processing wallet deposit with reference: TEST_REF_001. Time: 08/09/2024 12:15:21
info: LimestoneFinancialService.Infrastructure.Services.TransactionService[0]
      Processing wallet deposit with reference: TEST_REF_001. Time: 08/09/2024 12:15:21
info: LimestoneFinancialService.Infrastructure.Services.TransactionService[0]
      Processing wallet deposit with reference: TEST_REF_001. Time: 08/09/2024 12:15:21
info: LimestoneFinancialService.Infrastructure.Services.TransactionService[0]
      Processing wallet deposit with reference: TEST_REF_001. Time: 08/09/2024 12:15:21
info: LimestoneFinancialService.Infrastructure.Services.TransactionService[0]

The code looks like this:

private readonly ConcurrentDictionary<string, object> _walletLocks = new();
private readonly ConcurrentDictionary<string, object> _transactionReferenceLocks = new();

lock (_transactionReferenceLocks.GetOrAdd(transactionReference, () => new object()))
            {
                // Fetch the webhook event by reference and event type
                var webhookEvent = unitOfWork.WebhookEvents.GetByReferenceAndEvent(transactionReference, Events.CHARGE_SUCCESS);


                try
                {
                    unitOfWork.BeginTransaction();

                    var transactions = unitOfWork.WalletTransactions.GetFilteredTransactions(x =>
                        x.TransactionType == TransactionType.Deposit
                        && x.TransactionStatus == WalletTransactionStatus.Pending
                        && x.TransactionReference == transactionReference
                    );

                    if (transactions.Count == 0)
                    {
                        logger.LogWarning("No pending deposit transactions for transaction reference: {transactionReference}", transactionReference);
                        return;
                    }

                    // Find the credit and debit transactions
                    var creditTransaction = transactions.FirstOrDefault(x => x.Entry == TransactionEntry.Cr);
                    var debitTransaction = transactions.FirstOrDefault(x => x.Entry == TransactionEntry.Dr);
                    if (creditTransaction == null || debitTransaction == null)
                    {
                        logger.LogCritical(
                            "Credit or debit transaction not found for transaction reference: {transactionReference}",
                            transactionReference
                        );
                        return;
                    }

                    var walletToCredit = unitOfWork.Wallets.GetById(creditTransaction.WalletId);
                    var depositLedgerWallet = unitOfWork.Wallets.GetLedgerWallet(TransactionType.Deposit);


                    var chargeSuccessEvent = JsonSerializer.Deserialize<ChargeSuccessEvent>(webhookEvent.Data)!;
                    var transactionAmount = CurrencyUtility.ConvertKoboToNaira(chargeSuccessEvent.Amount);
                    if (creditTransaction.Amount != transactionAmount)
                    {
                        logger.LogCritical(
                            "Amount credited does not match the initialised amount. Credited: {creditedAmount}, Initialized: {initializedAmount}",
                            transactionAmount,
                            creditTransaction.Amount
                        );
                        return;
                    }

                    var transactionCompletionTimestampInUtc = chargeSuccessEvent.PaidAt.ToUniversalTime();

                    creditTransaction.TransactionTimestamp = transactionCompletionTimestampInUtc;
                    creditTransaction.TransactionStatus = WalletTransactionStatus.Successful;

                    debitTransaction.TransactionTimestamp = transactionCompletionTimestampInUtc;
                    debitTransaction.TransactionStatus = WalletTransactionStatus.Successful;

                    LockAndCreditWallet(transactionAmount, walletToCredit, transactionReference);
                    LockAndDebitWallet(transactionAmount, depositLedgerWallet, transactionReference);

                    unitOfWork.Commit();
                }
                finally
                {
                    _transactionReferenceLocks.TryRemove(transactionReference, out _);
                }
            }

            logger.LogInformation("Completed wallet deposit for transaction reference: {transactionReference}", transactionReference);
        }

The code for debiting & crediting wallets looks like this:

private void LockAndDebitWallet(decimal amount, Wallet wallet, string transactionReference)
        {
            logger.LogInformation("Debiting wallet {walletId} for transaction reference {transactionReference}.", wallet.Id, transactionReference);

            logger.LogDebug("Locking wallet...");
            lock (_walletLocks.GetOrAdd(wallet.Id, () => new object()))
            {
                try
                {
                    wallet.Balance -= amount;
                }
                finally
                {
                    _walletLocks.TryRemove(wallet.Id, out _);
                }
            }

            logger.LogDebug("Lock removed.");
            logger.LogInformation(
                "Debited wallet {walletId}. Removed {amount} {currency}. New Balance: {walletBalance}.",
                wallet.Id,
                amount,
                wallet.Currency,
                wallet.Balance
            );
        }

        private void LockAndCreditWallet(decimal amount, Wallet wallet, string transactionReference)
        {
            logger.LogInformation("Crediting wallet {walletId} for transaction reference {transactionReference}.", wallet.Id, transactionReference);

            logger.LogDebug("Locking wallet...");
            lock (_walletLocks.GetOrAdd(wallet.Id, () => new object()))
            {
                try
                {
                    wallet.Balance += amount;
                }
                finally
                {
                    _walletLocks.TryRemove(wallet.Id, out _);
                }
            }

            logger.LogDebug("Lock removed.");
            logger.LogInformation(
                "Credited wallet {walletId}. Added {amount} {currency}. New Balance: {walletBalance}.",
                wallet.Id,
                amount,
                wallet.Currency,
                wallet.Balance
            );
}

I am considering changing the approach to instead use a message queue. When events come in, they will be saved and published to the queue and listeners will process one transaction at a time - without trying to process others concurrently.

But, of course, I'd like to make this work or at least learn why my current approach is a bad/terrible idea.

10
  • 2
    If you intend you use a Servicebus or Eventhub in azure you can write all transactions of the same wallet to a specific partition and then process multiple partitions in parallel guaranteeing that messages of the same wallet will be processed sequentially. It would also be possible to scale that solution to several servers processing the messages and you would have to option to re-process message that for some reason fail. Commented Aug 9, 2024 at 11:58
  • 2
    Mind you that GetOrAdd is not entirely thread safe => andrewlock.net/… Commented Aug 9, 2024 at 12:23
  • 1
    I might not really understand the problem, but it it not possible to let the database handle the locking? Specifying a suitable IsolationLevel for the transaction might help. Commented Aug 9, 2024 at 12:35
  • I agree with your assessment: You should have started with a ConcurrentQueue. If you think about, "draining" accounts is all about "first come; first served" ... And use "pessimistic" locking in this case: Lock account; balance >= withdrawal ? do : don't do; Unlock. Commented Aug 9, 2024 at 15:49
  • 1
    @MarkCiliaVincenti good plug. i will check it out. Commented Aug 12, 2024 at 8:18

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.