2

Beginner level synchronization question here. There’s a lot of material on how to implement producer-consumer synchronization. I really did try all variations that I thought made sense, but none seems to work and I've been tackling this for a few days now.

I’m trying to implement producer-consumer code in the form that’s given in this great video, see minute 31:40. It’s a two phase handshake and I thought it would be right to use it for a general case.

I created a minimal reproducible example. To run it, insert own paths at ctrl+f OUTPUTFILEPATH (twice) and BINARYPATH. I must say it looks like it gives deterministic (and incorrect) outcome. It's 100% not a correct implementation, hence this post :)

Header.h:

#include <iostream>
#include <fstream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <string>

Simulator.h:

#include "Header.h"

extern std::mutex gSyncOutputReading_mutex;
extern std::condition_variable gSyncOutputReading_cv;
extern bool gSyncOutputReading_ready;
extern int readOnce;

class Simulator {
    public:
    
    std::thread threadRunSimulationOnce(int innerRuns) {
        return std::thread([=] { RunSimulationOnce(innerRuns); });
    }
    
    /* Producer */
    void RunSimulationOnce(int innerRuns);
};

Route.h:

#include "Header.h"

extern std::mutex gSyncOutputReading_mutex;
extern std::condition_variable gSyncOutputReading_cv;
extern bool gSyncOutputReading_ready;
extern int readOnce;

class Route {
    public:
    std::string data;

    std::thread threadReadInput() {
        return std::thread([=] {readInput();});
    }
    
    /* Consumer */
    void readInput();
};

Where Simulator is a class that holds simulation related metadata. Has a method that runs a simulation executable (source code given below) from the command line. Execution time for a single run is around ~400[msec] in the original project, where at the end a flat output file is updated. The output file contains a new route. Route is a class for handling routes themselves, and it has a method that reads in data from the above output file that a simulation run updated.

Simulator.cpp (Producer):

#include "Simulator.h"

 /* Producer */
void Simulator::RunSimulationOnce(int innerRuns) {
        //bool produced = false;
        //while(produced == false) {
        while(!readOnce) {

        std::unique_lock<std::mutex> ul(gSyncOutputReading_mutex);   // xxxx 

        std::string COMMAND = BINARYPATH + std::to_string(innerRuns);
        std::system(COMMAND.c_str());
        
        gSyncOutputReading_ready = true;
        ul.unlock();
        gSyncOutputReading_cv.notify_one();
        ul.lock();  
        gSyncOutputReading_cv.wait(ul, [](){ return gSyncOutputReading_ready == false; });
        //produced = true;
        }
        readOnce = 0;
    }

Route.cpp (Consumer):

#include "Route.h"

/* Consumer */
void Route::readInput() {
        //bool consumed = false;
        //while(consumed == false) {
        //while(1) {
        while(!readOnce) {
        std::unique_lock<std::mutex> ul(gSyncOutputReading_mutex);
        gSyncOutputReading_cv.wait(ul, [](){ return gSyncOutputReading_ready; });

        std::ifstream file(OUTPUTFILEPATH, std::ios_base::in);
        if ((file.is_open()))
        {
            std::string line;
            std::getline(file, line);
            this->data = line;
            file.close();
        }
        else { std::cerr << "Unable to open output file " << std::endl;}
    
        gSyncOutputReading_ready = false;
        readOnce = 1;
        ul.unlock();
        gSyncOutputReading_cv.notify_one();
        ul.lock();
        //consumed = true;
        }
    }

Main:

#include "Header.h"
#include "Simulator.h"
#include "Route.h"

std::mutex gSyncOutputReading_mutex;
std::condition_variable gSyncOutputReading_cv;
bool gSyncOutputReading_ready = false;
int readOnce = 0;

int main()
{ 
    std::vector<std::shared_ptr<Route>> routesVector;
    int numOfBatches = 10;
    int numOfInnerRuns = 4;
    int digitsInLine = 15;

    for (int i = 0; i < numOfBatches; i++)
    {
        std::cout << "\nSimulation run #" << i << std::endl;

        for(int innerRuns = 0; innerRuns < numOfInnerRuns; innerRuns++)
        {
            Simulator simulator;
            std::shared_ptr<Route> route = std::make_shared<Route>();
            std::thread T2 = simulator.threadRunSimulationOnce(innerRuns);
            std::thread T3 = route->threadReadInput();
            T2.join();
            T3.join();
            routesVector.push_back(route);
        }

        for(int j = 0; j < numOfInnerRuns; j++) {
            std::string str;
            for(int k = 0; k < digitsInLine; k++) str += std::to_string(j);
            std::cout << "Route number " << j << " (should show " << str << "): " << routesVector.at(i)->data << std::endl;
        }
    }
    return 0;
}

Code for "simulation" executable:

#include <fstream>
#include <string>

int main(int argc, char *argv[])
{ 
    (void) argc;
    int digitsInLine = 15;
    std::ofstream file;
        file.open(OUTPUTFILEPATH);
        for (int j = 0; j < digitsInLine; j++) {file << argv[1]; }
        file << std::endl;
        file.close();
    return 0;
}

Issue: Something is wrong with this synchronization pattern and I can't seem to find where. It was given in a great educational video linked above so maybe I got something wrong.

There, as well as several other examples found online, the producer and consumer are in a while(1).

If i use while(1), I would get a deadlock.

If inside the producer and consumer I simply comment out the while(1), I would sometimes get overlapping routes. Overlapping occasionally occurs, in a non-deterministic fashion. checked to see and data is indeed identical within overlapping routes, where input parameters to the simulation varied, which means that occasionally the same file is being read.

I need to break the infinite loop. thought of doing this by using booleans, but that didn’t work.

Help would be appreciated. Thank you :)

Output:

Simulation run #0
Route number 0 (should show 000000000000000): 000000000000000
Route number 1 (should show 111111111111111): 000000000000000
Route number 2 (should show 222222222222222): 000000000000000
Route number 3 (should show 333333333333333): 000000000000000

Simulation run #1
Route number 0 (should show 000000000000000): 111111111111111
Route number 1 (should show 111111111111111): 111111111111111
Route number 2 (should show 222222222222222): 111111111111111
Route number 3 (should show 333333333333333): 111111111111111

Simulation run #2
Route number 0 (should show 000000000000000): 222222222222222
Route number 1 (should show 111111111111111): 222222222222222
Route number 2 (should show 222222222222222): 222222222222222
Route number 3 (should show 333333333333333): 222222222222222
4
  • 3
    first rule of writing threadsafe code, never hold a mutex while executing code you don't own, second rule of writing threadsafe code, never hold a mutex for longer than you need to, both rules are not followed here, i would imagine there are problems as a consequence, lastly the thread-safety code is leaking into the program logic ... normally it should be encapsulated. Commented Jul 25, 2024 at 18:57
  • If you are only interested in the product there is a github implementation of lmax-exchange.github.io/disruptor. If this is for learning purposes ignore Commented Jul 25, 2024 at 19:39
  • 2
    routesVector.at(i)->data you have a typo of i in place of j here ... Commented Jul 25, 2024 at 20:11
  • @AhmedAEK Wow you're right. Now results on the minimal reproducible example seem to be as expected. Do you think the exiting condition while(!readOnce) makes sense here? Anyway, I'm going to test this in my project, see what happens. Thank you very much Commented Jul 25, 2024 at 20:27

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.