I am writing code for a wallet system. At some point we receive a webhook from our payment provider notifying us of a successful deposit. The event is saved & stored and queued as a background job using Hangfire.
The function to process this event is to find a transaction with a matching reference and amount, validate the transaction & credit the user's wallet. I am having problems managing concurrency due to the way I designed the system.
The aim was to use C#'s concurrent dictionary and lock statement to lock on a transaction reference (and wallet ids) allowing us to process multiple transactions at the same time or manipulate different wallets at the same time.
I wrote a test to check that only 1 thread would be allowed to access this function in the scenario that the job is queued multiple times. The test looks like this:
It looks like this:
public async Task CompleteWalletDepositAsync_ConcurrentCalls_ProcessesOnlyOnce()
{
// Arrange
var transactionReference = "TEST_REF_001";
var description = "test";
var userId = Guid.NewGuid().ToString();
var userWallet = new WalletBuilder().WithOwnerId(userId).WithOwnerType(OwnerType.User).WithBalance(0).Build();
var walletId = await CreateWalletAsync(userWallet);
var ledgerWallet = await unitOfWork.Wallets.GetLedgerWalletAsync(TransactionType.Deposit);
var creditTransaction = new WalletTransactionBuilder(_fakeTimeProvider.GetUtcNow().UtcDateTime)
.WithTransactionReference(transactionReference)
.WithWalletId(walletId)
.WithAmount(1000)
.WithDescription(description)
.WithCurrencyIsoCode(Currencies.NGN.ToString())
.WithProcessor(Processor.Paystack.ToString())
.WithWalletOwnerId(userId)
.WithWalletOwnerType(OwnerType.User)
.WithTransactionType(TransactionType.Deposit)
.WithTransactionStatus(WalletTransactionStatus.Pending)
.WithEntry(TransactionEntry.Cr)
.Build();
await CreateWalletTransactionAsync(creditTransaction);
var debitTransaction = new WalletTransactionBuilder(_fakeTimeProvider.GetUtcNow().UtcDateTime)
.WithTransactionReference(transactionReference)
.WithWalletId(ledgerWallet!.Id)
.WithAmount(1000)
.WithDescription(description)
.WithCurrencyIsoCode(Currencies.NGN.ToString())
.WithProcessor("Processor")
.WithWalletOwnerId(ledgerWallet.OwnerId)
.WithWalletOwnerType(ledgerWallet.OwnerType)
.WithTransactionType(TransactionType.Deposit)
.WithTransactionStatus(WalletTransactionStatus.Pending)
.WithEntry(TransactionEntry.Dr)
.Build();
await CreateWalletTransactionAsync(debitTransaction);
var webhookEvent = new WebhookEvent
{
Reference = transactionReference,
Event = Events.CHARGE_SUCCESS,
Data = "{\"amount\":100000,\"paid_at\":\"2024-08-09T12:00:00Z\"}",
Processor = "Processor"
};
await CreateWebhookEventAsync(webhookEvent);
// Act
var tasks = new List<Task>();
for (int i = 0; i < 5; i++)
{
tasks.Add(Task.Run(() => transactionService.CompleteWalletDepositAsync(transactionReference)));
}
await Task.WhenAll(tasks);
// Assert
var updatedWallet = await unitOfWork.Wallets.GetByIdAsync(walletId);
updatedWallet.Should().NotBeNull();
updatedWallet!.Balance.Should().Be(1000, "The wallet should be credited only once");
var updatedTransactions = await unitOfWork.WalletTransactions.GetFilteredTransactionsAsync(t =>
t.TransactionReference == transactionReference
);
updatedTransactions.Should().HaveCount(2, "There should still be only two transactions");
updatedTransactions.Should().AllSatisfy(t => t.TransactionStatus.Should().Be(WalletTransactionStatus.Successful));
}
When I run the test, all threads process at the same time and the wallet balance ends up as 5 times what it ought to be. The service ought to process 5 times, with only the first succeeding, as the transaction would have been marked as completed afterwards.
Output:
info: LimestoneFinancialService.Infrastructure.Services.TransactionService[0]
Processing wallet deposit with reference: TEST_REF_001. Time: 08/09/2024 12:15:21
info: LimestoneFinancialService.Infrastructure.Services.TransactionService[0]
Processing wallet deposit with reference: TEST_REF_001. Time: 08/09/2024 12:15:21
info: LimestoneFinancialService.Infrastructure.Services.TransactionService[0]
Processing wallet deposit with reference: TEST_REF_001. Time: 08/09/2024 12:15:21
info: LimestoneFinancialService.Infrastructure.Services.TransactionService[0]
Processing wallet deposit with reference: TEST_REF_001. Time: 08/09/2024 12:15:21
info: LimestoneFinancialService.Infrastructure.Services.TransactionService[0]
Processing wallet deposit with reference: TEST_REF_001. Time: 08/09/2024 12:15:21
info: LimestoneFinancialService.Infrastructure.Services.TransactionService[0]
The code looks like this:
private readonly ConcurrentDictionary<string, object> _walletLocks = new();
private readonly ConcurrentDictionary<string, object> _transactionReferenceLocks = new();
lock (_transactionReferenceLocks.GetOrAdd(transactionReference, () => new object()))
{
// Fetch the webhook event by reference and event type
var webhookEvent = unitOfWork.WebhookEvents.GetByReferenceAndEvent(transactionReference, Events.CHARGE_SUCCESS);
try
{
unitOfWork.BeginTransaction();
var transactions = unitOfWork.WalletTransactions.GetFilteredTransactions(x =>
x.TransactionType == TransactionType.Deposit
&& x.TransactionStatus == WalletTransactionStatus.Pending
&& x.TransactionReference == transactionReference
);
if (transactions.Count == 0)
{
logger.LogWarning("No pending deposit transactions for transaction reference: {transactionReference}", transactionReference);
return;
}
// Find the credit and debit transactions
var creditTransaction = transactions.FirstOrDefault(x => x.Entry == TransactionEntry.Cr);
var debitTransaction = transactions.FirstOrDefault(x => x.Entry == TransactionEntry.Dr);
if (creditTransaction == null || debitTransaction == null)
{
logger.LogCritical(
"Credit or debit transaction not found for transaction reference: {transactionReference}",
transactionReference
);
return;
}
var walletToCredit = unitOfWork.Wallets.GetById(creditTransaction.WalletId);
var depositLedgerWallet = unitOfWork.Wallets.GetLedgerWallet(TransactionType.Deposit);
var chargeSuccessEvent = JsonSerializer.Deserialize<ChargeSuccessEvent>(webhookEvent.Data)!;
var transactionAmount = CurrencyUtility.ConvertKoboToNaira(chargeSuccessEvent.Amount);
if (creditTransaction.Amount != transactionAmount)
{
logger.LogCritical(
"Amount credited does not match the initialised amount. Credited: {creditedAmount}, Initialized: {initializedAmount}",
transactionAmount,
creditTransaction.Amount
);
return;
}
var transactionCompletionTimestampInUtc = chargeSuccessEvent.PaidAt.ToUniversalTime();
creditTransaction.TransactionTimestamp = transactionCompletionTimestampInUtc;
creditTransaction.TransactionStatus = WalletTransactionStatus.Successful;
debitTransaction.TransactionTimestamp = transactionCompletionTimestampInUtc;
debitTransaction.TransactionStatus = WalletTransactionStatus.Successful;
LockAndCreditWallet(transactionAmount, walletToCredit, transactionReference);
LockAndDebitWallet(transactionAmount, depositLedgerWallet, transactionReference);
unitOfWork.Commit();
}
finally
{
_transactionReferenceLocks.TryRemove(transactionReference, out _);
}
}
logger.LogInformation("Completed wallet deposit for transaction reference: {transactionReference}", transactionReference);
}
The code for debiting & crediting wallets looks like this:
private void LockAndDebitWallet(decimal amount, Wallet wallet, string transactionReference)
{
logger.LogInformation("Debiting wallet {walletId} for transaction reference {transactionReference}.", wallet.Id, transactionReference);
logger.LogDebug("Locking wallet...");
lock (_walletLocks.GetOrAdd(wallet.Id, () => new object()))
{
try
{
wallet.Balance -= amount;
}
finally
{
_walletLocks.TryRemove(wallet.Id, out _);
}
}
logger.LogDebug("Lock removed.");
logger.LogInformation(
"Debited wallet {walletId}. Removed {amount} {currency}. New Balance: {walletBalance}.",
wallet.Id,
amount,
wallet.Currency,
wallet.Balance
);
}
private void LockAndCreditWallet(decimal amount, Wallet wallet, string transactionReference)
{
logger.LogInformation("Crediting wallet {walletId} for transaction reference {transactionReference}.", wallet.Id, transactionReference);
logger.LogDebug("Locking wallet...");
lock (_walletLocks.GetOrAdd(wallet.Id, () => new object()))
{
try
{
wallet.Balance += amount;
}
finally
{
_walletLocks.TryRemove(wallet.Id, out _);
}
}
logger.LogDebug("Lock removed.");
logger.LogInformation(
"Credited wallet {walletId}. Added {amount} {currency}. New Balance: {walletBalance}.",
wallet.Id,
amount,
wallet.Currency,
wallet.Balance
);
}
I am considering changing the approach to instead use a message queue. When events come in, they will be saved and published to the queue and listeners will process one transaction at a time - without trying to process others concurrently.
But, of course, I'd like to make this work or at least learn why my current approach is a bad/terrible idea.
GetOrAddis not entirely thread safe => andrewlock.net/…