1

I am trying to parallelize my code using HPX in order to improve performance. Below is the original code and my attempt to refactor it using HPX.

Original Code:

std::vector<std::vector<std::pair<int_t, int_t>>> find_mem(std::vector<std::string> data){
    std::string output = "";
    Timer timer;
    uint_t n = 0;
    unsigned char* concat_data = concat_strings(data, n); 
    uint_t *SA = NULL;
    SA = (uint_t*) malloc(n*sizeof(uint_t));
    int_t *LCP = NULL;
    LCP = (int_t*) malloc(n*sizeof(int_t));
    int32_t *DA = NULL;
    DA = (int32_t*) malloc(n*sizeof(int32_t));

    timer.reset();
    gsacak((unsigned char *)concat_data, (uint_t*)SA, LCP, DA, n);
    double suffix_construction_time = timer.elapsed_time();

    timer.reset();
    int_t min_mem_length = global_args.min_mem_length;
    int_t min_cross_sequence = ceil(global_args.min_seq_coverage * data.size());
    std::vector<uint_t> joined_sequence_bound;
    uint_t total_length = 0;
    for (uint_t i = 0; i < data.size(); i++) {
        joined_sequence_bound.push_back(total_length);
        total_length += data[i].length() + 1;
    }
    // Find all intervals with an LCP >= min_mem_length and <= min_cross_sequence
    std::vector<std::pair<uint_t, uint_t>> intervals = get_lcp_intervals(LCP, min_mem_length, min_cross_sequence, n);

    free(LCP);

    uint_t interval_size = intervals.size();

    std::vector<mem> mems;
    mems.resize(interval_size);
    // Convert each interval to a MEM in parallel
    IntervalToMemConversionParams* params = new IntervalToMemConversionParams[interval_size];
#if (defined(__linux__))
    threadpool pool;
    threadpool_init(&pool, global_args.thread);
    for (uint_t i = 0; i < interval_size; i++) {
        params[i].SA = SA;
        params[i].DA = DA;
        params[i].interval = intervals[i];
        params[i].concat_data = concat_data;
        params[i].result_store = mems.begin() + i;
        params[i].min_mem_length = min_mem_length;
        params[i].joined_sequence_bound = joined_sequence_bound;

        threadpool_add_task(&pool, interval2mem, params+i);
    }
    threadpool_destroy(&pool);
#else
#pragma omp parallel for num_threads(global_args.thread)
    for (uint_t i = 0; i < interval_size; i++) {
        params[i].SA = SA;
        params[i].DA = DA;
        params[i].interval = intervals[i];
        params[i].concat_data = concat_data;
        params[i].result_store = mems.begin() + i;
        params[i].min_mem_length = min_mem_length;
        params[i].joined_sequence_bound = joined_sequence_bound;
        interval2mem(params + i);
    }
#endif

    if (mems.size() <= 0 && global_args.verbose) {
        output = "Warning: There is no MEMs, please adjust your paramters.";
        print_table_line(output);
       
    }

    // Sort the MEMs based on their average positions and assign their indices
    sort_mem(mems, data);

    free(SA);
    free(DA);
    free(concat_data);
    delete[] params;

    uint_t sequence_num = data.size();
    std::vector<std::vector<std::pair<int_t, int_t>>> split_point_on_sequence;
    if (global_args.filter_mode == "global") {
        split_point_on_sequence = filter_mem_fast(mems, sequence_num);
    }
    else {
        split_point_on_sequence = filter_mem_accurate(mems, sequence_num);
    }
    
    global_args.avg_file_size = (n / (split_point_on_sequence[0].size() + 1)) / pow(2, 20);
    double mem_process_time = timer.elapsed_time();
   return split_point_on_sequence;
}

My Attempt Using HPX:

std::vector<std::vector<std::pair<int_t, int_t>>> find_mem(std::vector<std::string> data) {
    Timer timer;
    uint_t n = 0;
    // Task 1: string concatenation
    auto concat_task = hpx::dataflow(hpx::launch::async, [&]() {
        return concat_strings(data, n);
    });

    // Task 2: Suffix Array and LCP
    auto gsacak_task = hpx::dataflow(hpx::launch::async, [&](auto concat_data_future) {
        unsigned char* concat_data = concat_data_future.get();
        uint_t *SA = NULL;
        SA = (uint_t*) malloc(n*sizeof(uint_t));
        int_t *LCP = NULL;
        LCP = (int_t*) malloc(n*sizeof(int_t));
        int32_t *DA = NULL;
        DA = (int32_t*) malloc(n*sizeof(int32_t));

        timer.reset();
        gsacak((unsigned char *)concat_data, (uint_t*)SA, LCP, DA, n);
        return std::make_tuple(SA, LCP, DA, concat_data, n);
    }, concat_task);

    // Task 3: LCP Intervals
    auto intervals_task = hpx::dataflow(hpx::launch::async, [&](auto gsacak_result) {
        auto [SA, LCP, DA, concat_data, n] = gsacak_result.get();
        
        timer.reset();
        int_t min_mem_length = global_args.min_mem_length;
        int_t min_cross_sequence = ceil(global_args.min_seq_coverage * data.size());
        auto intervals = get_lcp_intervals(LCP, min_mem_length, min_cross_sequence, n);
        free(LCP); // LCP não é mais necessário
        return std::make_tuple(intervals, SA, DA, concat_data);
    }, gsacak_task);

    // Task 4: Intervals to MEM
    auto mems_task = hpx::dataflow(hpx::launch::async, [&](auto intervals_result) {
        auto [intervals, SA, DA, concat_data] = intervals_result.get();

        uint_t interval_size = intervals.size();
        std::vector<mem> mems(interval_size);
        int_t min_mem_length = global_args.min_mem_length;

        std::vector<uint_t> joined_sequence_bound;
        uint_t total_length = 0;
        for (auto& seq : data) {
            joined_sequence_bound.push_back(total_length);
            total_length += seq.length() + 1;
        }

        hpx::experimental::for_loop(hpx::execution::par, 0, interval_size, [&](int i) {
            IntervalToMemConversionParams params{
                .SA = SA,
                .DA = DA,
                .concat_data = concat_data,
                .result_store = mems.begin() + i,
                .min_mem_length = min_mem_length,
                .interval = intervals[i],
                .joined_sequence_bound = joined_sequence_bound
            };
            interval2mem(&params);
        });

        free(SA);
        free(DA);
        free(concat_data);
        return mems;
    }, intervals_task);

    // Task 5: MEM Filtering
    auto filter_task = hpx::dataflow(hpx::launch::async, [&](auto mems_future) {
        auto mems = mems_future.get();
        sort_mem(mems, data);
        return (global_args.filter_mode == "global")
            ? filter_mem_fast(mems, data.size())
            : filter_mem_accurate(mems, data.size());
    }, mems_task);


    // Final Result
    auto split_point_on_sequence = filter_task.get();
    global_args.avg_file_size = (n / (split_point_on_sequence[0].size() + 1)) / (1 << 20);
    return split_point_on_sequence;
}

Issue:

My goal is to parallelize parts of this function using HPX, hoping for performance improvement. However, after refactoring, I noticed that the gsacak function (which is written in C and cannot be modified) takes significantly longer to execute. In the original code, it takes approximately 0.99 seconds to execute and completes the function in 1.012 seconds. After my changes, the execution time for gsacak has increased to 2.29 seconds, and the function finishes in 2.31 seconds.

Questions:

Am I approaching this incorrectly? I am using HPX to parallelize parts of the code, but the performance gain is not as expected. What can I do to make the execution time of the refactored version comparable to the original code, since I have not modified the gsacak function, only the way it is called?

Context:

I am learning to work with HPX/C++ and need assistance with parallelization in this context. The performance degradation is only observed with the change in how tasks are scheduled using HPX; the original code works fine without parallelization. I would appreciate any suggestions or insights on how to improve the performance of my refactored code.

6
  • Assuming the code is correct (I do not know much about HPX), it could come from memory effects on a NUMA platform of thread-local issues of the underlying C function. For the former, the best would be to run the code on a single NUMA node (maybe you can request HPC to use a single specific core for example to do that). For the later, I think you need to read the code of the C library to check if it support being called from another thread and if its execution is efficient in this case. An alternative solution would be to run it from different thread manually just to check its performance. Commented Jan 24 at 23:44
  • If you provide the specification the computing node used to run the HPC code (exact reference of the CPU + number of sockets), then I can check if NUMA effects are possible on it ;) . Alternatively, lstopo (on Linux) can also help to get information about this (though not perfect). Commented Jan 24 at 23:48
  • 1
    Why are you using mallocs? As for optimizations: Make each thread's workpackage's size a multiple of the size of the cacheline to avoid false sharing. You can use std::hardware_destructive_interference_size to get a rought idea but sometimes the cacheline is smaller and you could get away with half that size. Commented Jan 25 at 0:40
  • 1
    @JérômeRichard, The CPU configuration consists of a single package with 16 cores (from Core L#0 to Core L#15) organized in NUMANode L#0. Each core is paired with its own private caches (L1, L2) and processing units (PUs) like this: L2 L#15 (2048KB) + L1d L#15 (48KB) + L1i L#15 (32KB) + Core L#15, PU L#30 (P#15), PU L#31 (P#31) Commented Jan 25 at 3:28
  • 1
    @tlparolin I was actually wondering about both the original and the new code. Why malloc? Commented Jan 25 at 8:35

1 Answer 1

1

If I understand your HPX code correctly, you're creating five tasks that are interdependent on each other (task2 depends on task1's result, task3 depends on task2's result, etc.). That prevents any parallelization from happening as the respective next task will only start running once the predecessor task has produced its result (is finished running).

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.