0

I have C# code that reads a .TXT file, processes each line, and subsequently inserts documents into a MongoDB database. Since the file contains many lines (approximately 70k), both reading all lines and inserting everything into the database take too long.

To try to optimize this process, I attempted:

  1. Using bulkWrite to speed up the database I/O operations;
  2. Transforming the bulkWrite into an async task, so that the next batch can be populated while the previous one is still being inserted.

After these changes, the core of my code is configured similarly to:

public async Task ProcessTickersAsync()
{
    string tempPath = Path.GetTempPath();
    string today = DateTime.Now.ToString("yyyy-MM-dd");

    string zipFileName = $"ListaCompletaSeriesAutorizadas_{today}.zip";
    string zipFilePath = Path.Combine(tempPath, zipFileName);

    if (File.Exists(zipFilePath))
    {
        string originalFileName = "SI_D_SEDE.txt";
        string newFileName = $"SI_D_SEDE_{DateTime.Now:yyyyMMdd}.txt";
        string newFilePath = Path.Combine(tempPath, newFileName);

        using (System.IO.Compression.ZipArchive archive = ZipFile.OpenRead(zipFilePath))
        {
            System.IO.Compression.ZipArchiveEntry entry = archive.GetEntry(originalFileName);

            if (entry != null)
            {
                bool fileAvailable = false;

                while (!fileAvailable)
                {
                    try
                    {
                        using (FileStream fileStream = new FileStream(newFilePath, FileMode.Create, FileAccess.Write, FileShare.None, bufferSize: 4096, useAsync: true))
                        {
                            await entry.Open().CopyToAsync(fileStream);
                            fileAvailable = true;
                        }
                    }
                    catch (IOException)
                    {
                        Thread.Sleep(100);
                    }
                }

                HashSet<DateTime> dates = new HashSet<DateTime>();

                foreach (var row in File.ReadLines(newFilePath))
                {
                    string[] contents = row.Split('|');

                    if (contents[0] == "02")
                    {
                        string formattedDate = contents[17].Substring(6, 2) + "/" + contents[17].Substring(4, 2) + "/" + contents[17].Substring(0, 4);
                        DateTime Date = DateTime.ParseExact(formattedDate, "dd/MM/yyyy", null);
                        dates.Add(Date);
                    }
                    else if (contents[0] == "03")
                    {
                        string formattedDate = contents[16].Substring(6, 2) + "/" + contents[16].Substring(4, 2) + "/" + contents[16].Substring(0, 4);
                        DateTime Date = DateTime.ParseExact(formattedDate, "dd/MM/yyyy", null);
                        dates.Add(Date);
                    }
                };

                double minDate = _variables.context_.DateTimeToExcelDate(dates.Min());
                double maxDate = _variables.context_.DateTimeToExcelDate(dates.Max());

                var tickersfilter = new BsonDocument{{ Fields.TickersFields.ExpiryDate, new BsonDocument{{ "$gte", minDate },{ "$lte", maxDate }}},
                                                { Fields.TickersFields.Strike, new BsonDocument{{ "$exists", true }}},
                                                { Fields.TickersFields.ProductClass, "EQ OPT" }};
                Dictionary<string, object> tickersDict = _variables.context_.GetTickers("Tickers", tickersfilter);
                var histTickersfilter = new BsonDocument{{ Fields.TickersFields.ExpiryDate, new BsonDocument{{ "$gte", minDate },{ "$lte", maxDate }}},
                                                {Fields.TickersFields.Payout, new BsonDocument{{ "$exists", true }}}};

                Dictionary<string, object> histTickersDict = _variables.context_.GetTickers("HistoricalTickers", histTickersfilter);

                List<WriteModel<BsonDocument>> insertProceeds = new List<WriteModel<BsonDocument>>();
                List<WriteModel<BsonDocument>> deleteProceeds = new List<WriteModel<BsonDocument>>();
                List<BsonDocument> histTickersListToInsert = new List<BsonDocument>();
                List<BsonDocument> TickersListToInsert = new List<BsonDocument>();
                Task<BulkWriteResult<BsonDocument>> bulkInsert = null;

                var documentsToInsert = new List<BsonDocument>();
                var filtersToDelete = new List<BsonDocument>();

                foreach (string row in File.ReadLines(newFilePath))
                {
                    var contents = row.Split('|');

                    switch (contents[0])
                    {
                        case "02":
                            {
                                var bo = new BsonDocument();
                                string payout = "";
                                string contract = "";
                                string asset = "";
                                string assetType = "";

                                if (contents[2] == "70")
                                {
                                    payout = "C";
                                }
                                else if (contents[2] == "80")
                                {
                                    payout = "P";
                                }

                                asset = contents[6].Split(' ')[0];
                                assetType = contents[7].Split(' ')[0];

                                switch (assetType)
                                {
                                    case "ON":
                                        asset += "3";
                                        break;

                                    case "PN":
                                        asset += "4";
                                        break;

                                    case "PNA":
                                        asset += "5";
                                        break;

                                    case "PNB":
                                        asset += "6";
                                        break;

                                    case "PNC":
                                        asset += "7";
                                        break;

                                    case "PND":
                                        asset += "8";
                                        break;

                                    case "UNT":
                                        asset += "11";
                                        break;

                                    case "CI":
                                        asset += "11";
                                        break;
                                }

                                string tag = contents[13].Split(' ')[0];
                                var style = contents[14].Split(' ')[0];

                                if (style == "1")
                                {
                                    contract = "AM_OPT";
                                }
                                else if (style == "2")
                                {
                                    contract = "EU_OPT";
                                }

                                double strike = Math.Round(Convert.ToDouble(contents[16], CultureInfo.InvariantCulture), 2);
                                string expiryDate = contents[17].Substring(6, 2) + "/" + contents[17].Substring(4, 2) + "/" + contents[17].Substring(0, 4);
                                bo[Fields.TickersFields.Tag] = tag;
                                bo[Fields.TickersFields.ProductAlias] = asset + "_" + contract;
                                bo[Fields.TickersFields.Strike] = strike;
                                bo[Fields.TickersFields.ExpiryDate] = _variables.context_.DateTimeToExcelDate(DateTime.ParseExact(expiryDate, "dd/MM/yyyy", null));
                                bo[Fields.TickersFields.Payout] = payout;
                                bo[Fields.TickersFields.ProductClass] = "EQ OPT";
                                object ticker = null;
                                string tagString = tag.ToString();

                                if (tickersDict.ContainsKey(tagString) && bo[Fields.TickersFields.ExpiryDate].Equals(((Dictionary<string, object>)tickersDict[tagString])[Fields.TickersFields.ExpiryDate]))
                                {
                                    ticker = tickersDict[tagString];
                                }

                                var historicalList = new List<object>();

                                if (histTickersDict.ContainsKey(tagString))
                                {
                                    var histTicker = histTickersDict[tagString];
                                    var histDict = histTicker as Dictionary<string, object>; ;

                                    if (histDict != null && histDict.ContainsKey(Fields.TickersFields.ExpiryDate))
                                    {
                                        var hTicker = histDict[Fields.TickersFields.ExpiryDate];

                                        if (bo.Contains(Fields.TickersFields.ExpiryDate) && hTicker.Equals(bo[Fields.TickersFields.ExpiryDate]))
                                        {
                                            historicalList.Add(histDict);
                                        }
                                    }
                                }

                                if (ticker != null && ticker is Dictionary<string, object> tickerDict && Math.Abs(Convert.ToDouble(tickerDict[Fields.TickersFields.Strike]) - strike) > 1e-4)
                                {
                                    var filter = new BsonDocument
                                        {
                                        { Fields.DividendsFields.SecurityLabel, asset },
                                        { Fields.DividendsFields.ExDate, _variables.excelDate },
                                        { Fields.DividendsFields.Desk, "PROP" },
                                        { Fields.DividendsFields.DividendType, new BsonDocument { { "$in", new BsonArray { "StockDividend", "Split" } } } }
                                        };
                                    var actualDividendList = _variables.context_.GetEQDividendsQuery(filter);
                                    double ratio = 1.0;
                                    string dividendType = "CashDividend";
                                    if (actualDividendList.Count() != 0)
                                    {
                                        BsonDocument actualDvd = actualDividendList[0];
                                        ratio = actualDvd[Fields.DividendsFields.Ratio].ToDouble();
                                        dividendType = actualDvd[Fields.DividendsFields.DividendType].AsString;
                                    }

                                    var bob = new BsonDocument
                                    {
                                    { Fields.ProceedsFields.Ratio, ratio },
                                    { Fields.ProceedsFields.ExpiryDate, bo[Fields.TickersFields.ExpiryDate].ToString() },
                                    { Fields.ProceedsFields.AdjustedStrike, strike },
                                    { Fields.ProceedsFields.ExDate, _variables.excelDate },
                                    { Fields.ProceedsFields.OriginalStrike, Convert.ToDouble(tickerDict[Fields.TickersFields.Strike]) },
                                    { Fields.ProceedsFields.ProductAlias, bo[Fields.TickersFields.ProductAlias].ToString() },
                                    { Fields.ProceedsFields.Ticker, tag },
                                    { Fields.ProceedsFields.DividendType, dividendType },
                                    };
                                    var deleteFilter = new BsonDocument
                                    {
                                    { Fields.ProceedsFields.ProductAlias , bob[Fields.ProceedsFields.ProductAlias] },
                                    { Fields.ProceedsFields.ExDate , bob[Fields.ProceedsFields.ExDate] },
                                    { Fields.ProceedsFields.Ticker , bob[Fields.ProceedsFields.Ticker] },
                                    { Fields.ProceedsFields.OriginalStrike , bob[Fields.ProceedsFields.OriginalStrike] },
                                    { Fields.ProceedsFields.ExpiryDate , bob[Fields.ProceedsFields.ExpiryDate] },
                                    { Fields.ProceedsFields.DividendType , bob[Fields.ProceedsFields.DividendType] }
                                    };
                                    insertProceeds.Add(new InsertOneModel<BsonDocument>(bob));
                                    deleteProceeds.Add(new DeleteOneModel<BsonDocument>(deleteFilter));
                                }
                                if (historicalList.Count() == 0)
                                {
                                    BsonDocument newTicker = BsonExtensions.DeepCopy(bo);
                                    newTicker[Fields.TickersFields.LastChangeDate] = _variables.excelDate;

                                    var filter = new BsonDocument
                                    {
                                        { Fields.TickersFields.Tag, newTicker[Fields.TickersFields.Tag] },
                                        { Fields.TickersFields.LastChangeDate, newTicker[Fields.TickersFields.LastChangeDate] }
                                    };
                                    var set = new BsonDocument
                                    {
                                        { "$set", newTicker }
                                    };
                                    _variables.context_.UpdateHistTickers(filter, set);
                                }
                            }
                            break;
                        case "03":
                            {
                                var bo = new BsonDocument();
                                string payout = "";
                                string contract = "";
                                if (contents[1] == "70")
                                {
                                    payout = "C";
                                }
                                else if (contents[1] == "80")
                                {
                                    payout = "P";
                                }
                                string asset = contents[5].Split(' ')[0];
                                if (asset == "IBOVESPA")
                                {
                                    asset = "IBOV";
                                }
                                string tag = contents[11].Split(' ')[0];
                                double strike = Math.Round(Convert.ToDouble(contents[14], CultureInfo.InvariantCulture), 2);
                                var style = contents[12].Split(' ')[0];
                                if (style == "1")
                                {
                                    contract = "AM_OPT";
                                }
                                else if (style == "2")
                                {
                                    contract = "EU_OPT";
                                }
                                string expiryDate = contents[16].Substring(6, 2) + "/" + contents[16].Substring(4, 2) + "/" + contents[16].Substring(0, 4);
                                bo[Fields.TickersFields.Tag] = tag;
                                bo[Fields.TickersFields.ProductAlias] = asset + "_" + contract;
                                bo[Fields.TickersFields.Strike] = strike;
                                bo[Fields.TickersFields.ExpiryDate] = _variables.context_.DateTimeToExcelDate(DateTime.ParseExact(expiryDate, "dd/MM/yyyy", null));
                                bo[Fields.TickersFields.Payout] = payout;
                                bo[Fields.TickersFields.ProductClass] = "EQ OPT";
                                object ticker = null;
                                string tagString = tag.ToString();
                                if (tickersDict.ContainsKey(tagString) && bo[Fields.TickersFields.ExpiryDate].Equals(((Dictionary<string, object>)tickersDict[tagString])[Fields.TickersFields.ExpiryDate]))
                                {
                                    ticker = tickersDict[tagString];
                                }
                                var historicalList = new List<object>();
                                if (histTickersDict.ContainsKey(tagString))
                                {
                                    var histTicker = histTickersDict[tagString];
                                    var histDict = histTicker as Dictionary<string, object>;

                                    if (histDict != null && histDict.ContainsKey(Fields.TickersFields.ExpiryDate))
                                    {
                                        var hTicker = histDict[Fields.TickersFields.ExpiryDate];
                                        if (bo.Contains(Fields.TickersFields.ExpiryDate) && hTicker.Equals(bo[Fields.TickersFields.ExpiryDate]))
                                        {
                                            historicalList.Add(histDict);
                                        }
                                    }
                                    if (ticker != null && ticker is Dictionary<string, object> tickerDict && Math.Abs(Convert.ToDouble(tickerDict[Fields.TickersFields.Strike]) - strike) > 1e-4)
                                    {
                                        var filter = new BsonDocument
                                    {
                                        { Fields.DividendsFields.SecurityLabel, asset },
                                        { Fields.DividendsFields.ExDate, _variables.excelDate },
                                        { Fields.DividendsFields.Desk, "PROP" },
                                        { Fields.DividendsFields.DividendType, new BsonDocument { { "$in", new BsonArray { "StockDividend", "Split" } } } }
                                    };
                                        var actualDividendList = _variables.context_.GetEQDividendsQuery(filter);
                                        double ratio = 1.0;
                                        string dividendType = "CashDividend";
                                        if (actualDividendList.Count() != 0)
                                        {
                                            BsonDocument actualDvd = actualDividendList[0];
                                            ratio = actualDvd[Fields.DividendsFields.Ratio].ToDouble();
                                            dividendType = actualDvd[Fields.DividendsFields.DividendType].AsString;
                                        }
                                        var bob = new BsonDocument
                                    {
                                        { Fields.ProceedsFields.Ratio, 1},
                                        { Fields.ProceedsFields.ExpiryDate, bo[Fields.TickersFields.ExpiryDate].ToString() },
                                        { Fields.ProceedsFields.AdjustedStrike, strike },
                                        { Fields.ProceedsFields.ExDate, _variables.excelDate },
                                        { Fields.ProceedsFields.OriginalStrike, Convert.ToDouble(tickerDict[Fields.TickersFields.Strike]) },
                                        { Fields.ProceedsFields.ProductAlias, bo[Fields.TickersFields.ProductAlias].ToString() },
                                        { Fields.ProceedsFields.Ticker, tag },
                                        { Fields.ProceedsFields.DividendType, "CashDividend"},
                                    };
                                        _variables.context_.InsertEQProceeds(bob);
                                        _variables.context_.DeleteEQProceeds(filter);
                                    }
                                    if (historicalList.Count() == 0)
                                    {
                                        BsonDocument newTicker = BsonExtensions.DeepCopy(bo);
                                        newTicker[Fields.TickersFields.LastChangeDate] = _variables.excelDate;

                                        var filter = new BsonDocument
                                    {
                                        { Fields.TickersFields.Tag, newTicker[Fields.TickersFields.Tag] },
                                        { Fields.TickersFields.LastChangeDate, newTicker[Fields.TickersFields.LastChangeDate] }
                                    };
                                        var set = new BsonDocument
                                    {
                                        { "$set", newTicker }
                                    };
                                        _variables.context_.UpdateHistTickers(filter, set);
                                    }
                                }
                            }
                            break;
                    }

                    if (insertProceeds.Count >= 500 )
                    {
                        if (bulkInsert != null)
                            bulkInsert.Wait();
                        bulkInsert = _variables.context_.BulkWriteInsertEQProceeds(insertProceeds);
                        insertProceeds = new List<WriteModel<BsonDocument>>();
                    }
                };

                if (insertProceeds.Count > 0 || deleteProceeds.Count > 0)
                {
                    _variables.context_.BulkWriteInsertEQProceeds(insertProceeds);
                }


            }

            else
            {
                _variables.Errors.Add($"Arquivo {originalFileName} não encontrado no ZIP.");
                _variables.taskLogger_.LogError($"Arquivo {originalFileName} não encontrado no ZIP.");
                return;
            }
        }
    }
    else
    {
        _variables.Errors.Add($"Arquivo {zipFileName} não encontrado no caminho {tempPath}.");
        _variables.taskLogger_.LogError($"Arquivo {zipFileName} não encontrado no caminho {tempPath}.");
        //NullReferenceException //algo

    }
}

I read some information about implementing parallelism in loops, but I didn't quite understand in which cases I should create a ConcurrentBag instead of a list (whether I should transform all lists - both for reading and editing - into ConcurrentBag or only the lists that are edited during line manipulation), and if there are other parameters that should be balanced to optimize the efficiency vs. memory relationship.

Besides the parallelism issue, is there anything else that can be done to make this faster? Running the code in debug mode, I noticed an inconsistency in the time it takes to populate the list used in bulk Write, which sometimes happens in 625ms, and other times takes more than 10000ms. Is there any way to reduce these fluctuations?

9
  • Start by reading about ConcurrentBag. After understanding how it works, you can change your foreach into Parallel.ForEach. Pretty broad though, you don't really have a specific issue. Commented Apr 28 at 18:05
  • 1
    You may consider to use the mongoimport. I think it would import these 70k rows in less than one second. Commented Apr 28 at 18:09
  • 1
    I would suggest presenting the original version of your code, that processed each line sequentially, and ask for advice about how to parallelize it. Presenting a half-baked/flawed parallelization attempt and asking us how to fix it, isn't going to be productive. Commented Apr 28 at 18:18
  • @djv the ConcurrentBag<T> is an extremely specialized collection, that is intended for rare mixed producer-consumer scenarios. In this case it's most likely the least suitable concurrent collection. Apart from that, the comments are intended for asking for clarifications about the question, or suggesting improvements for the question. We are not supposed to answer the question in the comments. Commented Apr 28 at 18:27
  • 1
    Some very simple things to consider. a) histTickersDict values are object but you treat them as dictionary. Why not make the values of histTickersDict be dictionaries instead? b) Most of your use of ContainsKey should be TryGetValue. c) Separate file processing from data processing. A simple starting point - read from the file and push individual lines (or maybe a group of 10 lines) into a BlockingCollection. Then have one or more consumers reading from that BlockingCollection. If you do this, you may need to change some data types (e.g. HashSet is not thread safe). Commented Apr 29 at 1:20

1 Answer 1

0

My suggestion is to abstract the parallelization mechanism from the processing logic. Mixing an already complex logic with synchronization primitives can become quite ugly. In another answer I posted a ParallelizeTwoActions method that invokes two actions for each element in a sequence, parallelizing the action1 and the action2 for two subsequent elements. In that case the two actions were independent, but in your case it seems that you want the result of the first action as input for the second action. So here is a slightly modified version of the same mechanism:

public static async Task ParallelizeTwoActions<TSource, TResult>(
    IEnumerable<TSource> source,
    Func<TSource, TResult> action1,
    Action<TSource, TResult> action2)
{
    Task<TResult> task1 = Task.FromResult<TResult>(default);
    Task task2 = Task.CompletedTask;
    try
    {
        foreach (TSource item in source)
        {
            task1 = Task.Run(() => action1(item));
            await Task.WhenAll(task1, task2).ConfigureAwait(false);
            TResult result = task1.Result;
            task2 = Task.Run(() => action2(item, result));
        }
    }
    finally
    {
        await Task.WhenAll(task1, task2).ConfigureAwait(false);
    }
}

You could use the ParallelizeTwoActions like this:

ParallelizeTwoActions(File.ReadLines(newFilePath).Chunk(500), lines =>
{
    // First action
    // Project the lines to BSON documents
    return lines.Select(line =>
    {
        BsonDocument document = ParseLine(line);
        return document;
    }).ToArray();
}, (lines, documents) =>
{
    // Second action
    // Insert the documents in the database
    BulkWriteInsertEQProceeds(documents);
}).Wait();

The Chunk is a LINQ operator that takes a sequence of elements, and returns a sequence of arrays. Each array has the specified number of elements.

The type of the parameter lines in the first action is string[], and the type of the parameter documents in the second action is BsonDocument[].

The above is just an example. You may have to modify it according to your needs.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.