I have C# code that reads a .TXT file, processes each line, and subsequently inserts documents into a MongoDB database. Since the file contains many lines (approximately 70k), both reading all lines and inserting everything into the database take too long.
To try to optimize this process, I attempted:
- Using
bulkWriteto speed up the database I/O operations; - Transforming the
bulkWriteinto an async task, so that the next batch can be populated while the previous one is still being inserted.
After these changes, the core of my code is configured similarly to:
public async Task ProcessTickersAsync()
{
string tempPath = Path.GetTempPath();
string today = DateTime.Now.ToString("yyyy-MM-dd");
string zipFileName = $"ListaCompletaSeriesAutorizadas_{today}.zip";
string zipFilePath = Path.Combine(tempPath, zipFileName);
if (File.Exists(zipFilePath))
{
string originalFileName = "SI_D_SEDE.txt";
string newFileName = $"SI_D_SEDE_{DateTime.Now:yyyyMMdd}.txt";
string newFilePath = Path.Combine(tempPath, newFileName);
using (System.IO.Compression.ZipArchive archive = ZipFile.OpenRead(zipFilePath))
{
System.IO.Compression.ZipArchiveEntry entry = archive.GetEntry(originalFileName);
if (entry != null)
{
bool fileAvailable = false;
while (!fileAvailable)
{
try
{
using (FileStream fileStream = new FileStream(newFilePath, FileMode.Create, FileAccess.Write, FileShare.None, bufferSize: 4096, useAsync: true))
{
await entry.Open().CopyToAsync(fileStream);
fileAvailable = true;
}
}
catch (IOException)
{
Thread.Sleep(100);
}
}
HashSet<DateTime> dates = new HashSet<DateTime>();
foreach (var row in File.ReadLines(newFilePath))
{
string[] contents = row.Split('|');
if (contents[0] == "02")
{
string formattedDate = contents[17].Substring(6, 2) + "/" + contents[17].Substring(4, 2) + "/" + contents[17].Substring(0, 4);
DateTime Date = DateTime.ParseExact(formattedDate, "dd/MM/yyyy", null);
dates.Add(Date);
}
else if (contents[0] == "03")
{
string formattedDate = contents[16].Substring(6, 2) + "/" + contents[16].Substring(4, 2) + "/" + contents[16].Substring(0, 4);
DateTime Date = DateTime.ParseExact(formattedDate, "dd/MM/yyyy", null);
dates.Add(Date);
}
};
double minDate = _variables.context_.DateTimeToExcelDate(dates.Min());
double maxDate = _variables.context_.DateTimeToExcelDate(dates.Max());
var tickersfilter = new BsonDocument{{ Fields.TickersFields.ExpiryDate, new BsonDocument{{ "$gte", minDate },{ "$lte", maxDate }}},
{ Fields.TickersFields.Strike, new BsonDocument{{ "$exists", true }}},
{ Fields.TickersFields.ProductClass, "EQ OPT" }};
Dictionary<string, object> tickersDict = _variables.context_.GetTickers("Tickers", tickersfilter);
var histTickersfilter = new BsonDocument{{ Fields.TickersFields.ExpiryDate, new BsonDocument{{ "$gte", minDate },{ "$lte", maxDate }}},
{Fields.TickersFields.Payout, new BsonDocument{{ "$exists", true }}}};
Dictionary<string, object> histTickersDict = _variables.context_.GetTickers("HistoricalTickers", histTickersfilter);
List<WriteModel<BsonDocument>> insertProceeds = new List<WriteModel<BsonDocument>>();
List<WriteModel<BsonDocument>> deleteProceeds = new List<WriteModel<BsonDocument>>();
List<BsonDocument> histTickersListToInsert = new List<BsonDocument>();
List<BsonDocument> TickersListToInsert = new List<BsonDocument>();
Task<BulkWriteResult<BsonDocument>> bulkInsert = null;
var documentsToInsert = new List<BsonDocument>();
var filtersToDelete = new List<BsonDocument>();
foreach (string row in File.ReadLines(newFilePath))
{
var contents = row.Split('|');
switch (contents[0])
{
case "02":
{
var bo = new BsonDocument();
string payout = "";
string contract = "";
string asset = "";
string assetType = "";
if (contents[2] == "70")
{
payout = "C";
}
else if (contents[2] == "80")
{
payout = "P";
}
asset = contents[6].Split(' ')[0];
assetType = contents[7].Split(' ')[0];
switch (assetType)
{
case "ON":
asset += "3";
break;
case "PN":
asset += "4";
break;
case "PNA":
asset += "5";
break;
case "PNB":
asset += "6";
break;
case "PNC":
asset += "7";
break;
case "PND":
asset += "8";
break;
case "UNT":
asset += "11";
break;
case "CI":
asset += "11";
break;
}
string tag = contents[13].Split(' ')[0];
var style = contents[14].Split(' ')[0];
if (style == "1")
{
contract = "AM_OPT";
}
else if (style == "2")
{
contract = "EU_OPT";
}
double strike = Math.Round(Convert.ToDouble(contents[16], CultureInfo.InvariantCulture), 2);
string expiryDate = contents[17].Substring(6, 2) + "/" + contents[17].Substring(4, 2) + "/" + contents[17].Substring(0, 4);
bo[Fields.TickersFields.Tag] = tag;
bo[Fields.TickersFields.ProductAlias] = asset + "_" + contract;
bo[Fields.TickersFields.Strike] = strike;
bo[Fields.TickersFields.ExpiryDate] = _variables.context_.DateTimeToExcelDate(DateTime.ParseExact(expiryDate, "dd/MM/yyyy", null));
bo[Fields.TickersFields.Payout] = payout;
bo[Fields.TickersFields.ProductClass] = "EQ OPT";
object ticker = null;
string tagString = tag.ToString();
if (tickersDict.ContainsKey(tagString) && bo[Fields.TickersFields.ExpiryDate].Equals(((Dictionary<string, object>)tickersDict[tagString])[Fields.TickersFields.ExpiryDate]))
{
ticker = tickersDict[tagString];
}
var historicalList = new List<object>();
if (histTickersDict.ContainsKey(tagString))
{
var histTicker = histTickersDict[tagString];
var histDict = histTicker as Dictionary<string, object>; ;
if (histDict != null && histDict.ContainsKey(Fields.TickersFields.ExpiryDate))
{
var hTicker = histDict[Fields.TickersFields.ExpiryDate];
if (bo.Contains(Fields.TickersFields.ExpiryDate) && hTicker.Equals(bo[Fields.TickersFields.ExpiryDate]))
{
historicalList.Add(histDict);
}
}
}
if (ticker != null && ticker is Dictionary<string, object> tickerDict && Math.Abs(Convert.ToDouble(tickerDict[Fields.TickersFields.Strike]) - strike) > 1e-4)
{
var filter = new BsonDocument
{
{ Fields.DividendsFields.SecurityLabel, asset },
{ Fields.DividendsFields.ExDate, _variables.excelDate },
{ Fields.DividendsFields.Desk, "PROP" },
{ Fields.DividendsFields.DividendType, new BsonDocument { { "$in", new BsonArray { "StockDividend", "Split" } } } }
};
var actualDividendList = _variables.context_.GetEQDividendsQuery(filter);
double ratio = 1.0;
string dividendType = "CashDividend";
if (actualDividendList.Count() != 0)
{
BsonDocument actualDvd = actualDividendList[0];
ratio = actualDvd[Fields.DividendsFields.Ratio].ToDouble();
dividendType = actualDvd[Fields.DividendsFields.DividendType].AsString;
}
var bob = new BsonDocument
{
{ Fields.ProceedsFields.Ratio, ratio },
{ Fields.ProceedsFields.ExpiryDate, bo[Fields.TickersFields.ExpiryDate].ToString() },
{ Fields.ProceedsFields.AdjustedStrike, strike },
{ Fields.ProceedsFields.ExDate, _variables.excelDate },
{ Fields.ProceedsFields.OriginalStrike, Convert.ToDouble(tickerDict[Fields.TickersFields.Strike]) },
{ Fields.ProceedsFields.ProductAlias, bo[Fields.TickersFields.ProductAlias].ToString() },
{ Fields.ProceedsFields.Ticker, tag },
{ Fields.ProceedsFields.DividendType, dividendType },
};
var deleteFilter = new BsonDocument
{
{ Fields.ProceedsFields.ProductAlias , bob[Fields.ProceedsFields.ProductAlias] },
{ Fields.ProceedsFields.ExDate , bob[Fields.ProceedsFields.ExDate] },
{ Fields.ProceedsFields.Ticker , bob[Fields.ProceedsFields.Ticker] },
{ Fields.ProceedsFields.OriginalStrike , bob[Fields.ProceedsFields.OriginalStrike] },
{ Fields.ProceedsFields.ExpiryDate , bob[Fields.ProceedsFields.ExpiryDate] },
{ Fields.ProceedsFields.DividendType , bob[Fields.ProceedsFields.DividendType] }
};
insertProceeds.Add(new InsertOneModel<BsonDocument>(bob));
deleteProceeds.Add(new DeleteOneModel<BsonDocument>(deleteFilter));
}
if (historicalList.Count() == 0)
{
BsonDocument newTicker = BsonExtensions.DeepCopy(bo);
newTicker[Fields.TickersFields.LastChangeDate] = _variables.excelDate;
var filter = new BsonDocument
{
{ Fields.TickersFields.Tag, newTicker[Fields.TickersFields.Tag] },
{ Fields.TickersFields.LastChangeDate, newTicker[Fields.TickersFields.LastChangeDate] }
};
var set = new BsonDocument
{
{ "$set", newTicker }
};
_variables.context_.UpdateHistTickers(filter, set);
}
}
break;
case "03":
{
var bo = new BsonDocument();
string payout = "";
string contract = "";
if (contents[1] == "70")
{
payout = "C";
}
else if (contents[1] == "80")
{
payout = "P";
}
string asset = contents[5].Split(' ')[0];
if (asset == "IBOVESPA")
{
asset = "IBOV";
}
string tag = contents[11].Split(' ')[0];
double strike = Math.Round(Convert.ToDouble(contents[14], CultureInfo.InvariantCulture), 2);
var style = contents[12].Split(' ')[0];
if (style == "1")
{
contract = "AM_OPT";
}
else if (style == "2")
{
contract = "EU_OPT";
}
string expiryDate = contents[16].Substring(6, 2) + "/" + contents[16].Substring(4, 2) + "/" + contents[16].Substring(0, 4);
bo[Fields.TickersFields.Tag] = tag;
bo[Fields.TickersFields.ProductAlias] = asset + "_" + contract;
bo[Fields.TickersFields.Strike] = strike;
bo[Fields.TickersFields.ExpiryDate] = _variables.context_.DateTimeToExcelDate(DateTime.ParseExact(expiryDate, "dd/MM/yyyy", null));
bo[Fields.TickersFields.Payout] = payout;
bo[Fields.TickersFields.ProductClass] = "EQ OPT";
object ticker = null;
string tagString = tag.ToString();
if (tickersDict.ContainsKey(tagString) && bo[Fields.TickersFields.ExpiryDate].Equals(((Dictionary<string, object>)tickersDict[tagString])[Fields.TickersFields.ExpiryDate]))
{
ticker = tickersDict[tagString];
}
var historicalList = new List<object>();
if (histTickersDict.ContainsKey(tagString))
{
var histTicker = histTickersDict[tagString];
var histDict = histTicker as Dictionary<string, object>;
if (histDict != null && histDict.ContainsKey(Fields.TickersFields.ExpiryDate))
{
var hTicker = histDict[Fields.TickersFields.ExpiryDate];
if (bo.Contains(Fields.TickersFields.ExpiryDate) && hTicker.Equals(bo[Fields.TickersFields.ExpiryDate]))
{
historicalList.Add(histDict);
}
}
if (ticker != null && ticker is Dictionary<string, object> tickerDict && Math.Abs(Convert.ToDouble(tickerDict[Fields.TickersFields.Strike]) - strike) > 1e-4)
{
var filter = new BsonDocument
{
{ Fields.DividendsFields.SecurityLabel, asset },
{ Fields.DividendsFields.ExDate, _variables.excelDate },
{ Fields.DividendsFields.Desk, "PROP" },
{ Fields.DividendsFields.DividendType, new BsonDocument { { "$in", new BsonArray { "StockDividend", "Split" } } } }
};
var actualDividendList = _variables.context_.GetEQDividendsQuery(filter);
double ratio = 1.0;
string dividendType = "CashDividend";
if (actualDividendList.Count() != 0)
{
BsonDocument actualDvd = actualDividendList[0];
ratio = actualDvd[Fields.DividendsFields.Ratio].ToDouble();
dividendType = actualDvd[Fields.DividendsFields.DividendType].AsString;
}
var bob = new BsonDocument
{
{ Fields.ProceedsFields.Ratio, 1},
{ Fields.ProceedsFields.ExpiryDate, bo[Fields.TickersFields.ExpiryDate].ToString() },
{ Fields.ProceedsFields.AdjustedStrike, strike },
{ Fields.ProceedsFields.ExDate, _variables.excelDate },
{ Fields.ProceedsFields.OriginalStrike, Convert.ToDouble(tickerDict[Fields.TickersFields.Strike]) },
{ Fields.ProceedsFields.ProductAlias, bo[Fields.TickersFields.ProductAlias].ToString() },
{ Fields.ProceedsFields.Ticker, tag },
{ Fields.ProceedsFields.DividendType, "CashDividend"},
};
_variables.context_.InsertEQProceeds(bob);
_variables.context_.DeleteEQProceeds(filter);
}
if (historicalList.Count() == 0)
{
BsonDocument newTicker = BsonExtensions.DeepCopy(bo);
newTicker[Fields.TickersFields.LastChangeDate] = _variables.excelDate;
var filter = new BsonDocument
{
{ Fields.TickersFields.Tag, newTicker[Fields.TickersFields.Tag] },
{ Fields.TickersFields.LastChangeDate, newTicker[Fields.TickersFields.LastChangeDate] }
};
var set = new BsonDocument
{
{ "$set", newTicker }
};
_variables.context_.UpdateHistTickers(filter, set);
}
}
}
break;
}
if (insertProceeds.Count >= 500 )
{
if (bulkInsert != null)
bulkInsert.Wait();
bulkInsert = _variables.context_.BulkWriteInsertEQProceeds(insertProceeds);
insertProceeds = new List<WriteModel<BsonDocument>>();
}
};
if (insertProceeds.Count > 0 || deleteProceeds.Count > 0)
{
_variables.context_.BulkWriteInsertEQProceeds(insertProceeds);
}
}
else
{
_variables.Errors.Add($"Arquivo {originalFileName} não encontrado no ZIP.");
_variables.taskLogger_.LogError($"Arquivo {originalFileName} não encontrado no ZIP.");
return;
}
}
}
else
{
_variables.Errors.Add($"Arquivo {zipFileName} não encontrado no caminho {tempPath}.");
_variables.taskLogger_.LogError($"Arquivo {zipFileName} não encontrado no caminho {tempPath}.");
//NullReferenceException //algo
}
}
I read some information about implementing parallelism in loops, but I didn't quite understand in which cases I should create a ConcurrentBag instead of a list (whether I should transform all lists - both for reading and editing - into ConcurrentBag or only the lists that are edited during line manipulation), and if there are other parameters that should be balanced to optimize the efficiency vs. memory relationship.
Besides the parallelism issue, is there anything else that can be done to make this faster? Running the code in debug mode, I noticed an inconsistency in the time it takes to populate the list used in bulk Write, which sometimes happens in 625ms, and other times takes more than 10000ms. Is there any way to reduce these fluctuations?
ConcurrentBag<T>is an extremely specialized collection, that is intended for rare mixed producer-consumer scenarios. In this case it's most likely the least suitable concurrent collection. Apart from that, the comments are intended for asking for clarifications about the question, or suggesting improvements for the question. We are not supposed to answer the question in the comments.histTickersDictvalues areobjectbut you treat them as dictionary. Why not make the values ofhistTickersDictbe dictionaries instead? b) Most of your use ofContainsKeyshould beTryGetValue. c) Separate file processing from data processing. A simple starting point - read from the file and push individual lines (or maybe a group of 10 lines) into aBlockingCollection. Then have one or more consumers reading from thatBlockingCollection. If you do this, you may need to change some data types (e.g.HashSetis not thread safe).