Beginner level synchronization question here. There’s a lot of material on how to implement producer-consumer synchronization. I really did try all variations that I thought made sense, but none seems to work and I've been tackling this for a few days now.
I’m trying to implement producer-consumer code in the form that’s given in this great video, see minute 31:40. It’s a two phase handshake and I thought it would be right to use it for a general case.
I created a minimal reproducible example. To run it, insert own paths at ctrl+f OUTPUTFILEPATH (twice) and BINARYPATH. I must say it looks like it gives deterministic (and incorrect) outcome. It's 100% not a correct implementation, hence this post :)
Header.h:
#include <iostream>
#include <fstream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <string>
Simulator.h:
#include "Header.h"
extern std::mutex gSyncOutputReading_mutex;
extern std::condition_variable gSyncOutputReading_cv;
extern bool gSyncOutputReading_ready;
extern int readOnce;
class Simulator {
public:
std::thread threadRunSimulationOnce(int innerRuns) {
return std::thread([=] { RunSimulationOnce(innerRuns); });
}
/* Producer */
void RunSimulationOnce(int innerRuns);
};
Route.h:
#include "Header.h"
extern std::mutex gSyncOutputReading_mutex;
extern std::condition_variable gSyncOutputReading_cv;
extern bool gSyncOutputReading_ready;
extern int readOnce;
class Route {
public:
std::string data;
std::thread threadReadInput() {
return std::thread([=] {readInput();});
}
/* Consumer */
void readInput();
};
Where Simulator is a class that holds simulation related metadata. Has a method that runs a simulation executable (source code given below) from the command line. Execution time for a single run is around ~400[msec] in the original project, where at the end a flat output file is updated. The output file contains a new route. Route is a class for handling routes themselves, and it has a method that reads in data from the above output file that a simulation run updated.
Simulator.cpp (Producer):
#include "Simulator.h"
/* Producer */
void Simulator::RunSimulationOnce(int innerRuns) {
//bool produced = false;
//while(produced == false) {
while(!readOnce) {
std::unique_lock<std::mutex> ul(gSyncOutputReading_mutex); // xxxx
std::string COMMAND = BINARYPATH + std::to_string(innerRuns);
std::system(COMMAND.c_str());
gSyncOutputReading_ready = true;
ul.unlock();
gSyncOutputReading_cv.notify_one();
ul.lock();
gSyncOutputReading_cv.wait(ul, [](){ return gSyncOutputReading_ready == false; });
//produced = true;
}
readOnce = 0;
}
Route.cpp (Consumer):
#include "Route.h"
/* Consumer */
void Route::readInput() {
//bool consumed = false;
//while(consumed == false) {
//while(1) {
while(!readOnce) {
std::unique_lock<std::mutex> ul(gSyncOutputReading_mutex);
gSyncOutputReading_cv.wait(ul, [](){ return gSyncOutputReading_ready; });
std::ifstream file(OUTPUTFILEPATH, std::ios_base::in);
if ((file.is_open()))
{
std::string line;
std::getline(file, line);
this->data = line;
file.close();
}
else { std::cerr << "Unable to open output file " << std::endl;}
gSyncOutputReading_ready = false;
readOnce = 1;
ul.unlock();
gSyncOutputReading_cv.notify_one();
ul.lock();
//consumed = true;
}
}
Main:
#include "Header.h"
#include "Simulator.h"
#include "Route.h"
std::mutex gSyncOutputReading_mutex;
std::condition_variable gSyncOutputReading_cv;
bool gSyncOutputReading_ready = false;
int readOnce = 0;
int main()
{
std::vector<std::shared_ptr<Route>> routesVector;
int numOfBatches = 10;
int numOfInnerRuns = 4;
int digitsInLine = 15;
for (int i = 0; i < numOfBatches; i++)
{
std::cout << "\nSimulation run #" << i << std::endl;
for(int innerRuns = 0; innerRuns < numOfInnerRuns; innerRuns++)
{
Simulator simulator;
std::shared_ptr<Route> route = std::make_shared<Route>();
std::thread T2 = simulator.threadRunSimulationOnce(innerRuns);
std::thread T3 = route->threadReadInput();
T2.join();
T3.join();
routesVector.push_back(route);
}
for(int j = 0; j < numOfInnerRuns; j++) {
std::string str;
for(int k = 0; k < digitsInLine; k++) str += std::to_string(j);
std::cout << "Route number " << j << " (should show " << str << "): " << routesVector.at(i)->data << std::endl;
}
}
return 0;
}
Code for "simulation" executable:
#include <fstream>
#include <string>
int main(int argc, char *argv[])
{
(void) argc;
int digitsInLine = 15;
std::ofstream file;
file.open(OUTPUTFILEPATH);
for (int j = 0; j < digitsInLine; j++) {file << argv[1]; }
file << std::endl;
file.close();
return 0;
}
Issue: Something is wrong with this synchronization pattern and I can't seem to find where. It was given in a great educational video linked above so maybe I got something wrong.
There, as well as several other examples found online, the producer and consumer are in a while(1).
If i use while(1), I would get a deadlock.
If inside the producer and consumer I simply comment out the while(1), I would sometimes get overlapping routes. Overlapping occasionally occurs, in a non-deterministic fashion. checked to see and data is indeed identical within overlapping routes, where input parameters to the simulation varied, which means that occasionally the same file is being read.
I need to break the infinite loop. thought of doing this by using booleans, but that didn’t work.
Help would be appreciated. Thank you :)
Output:
Simulation run #0
Route number 0 (should show 000000000000000): 000000000000000
Route number 1 (should show 111111111111111): 000000000000000
Route number 2 (should show 222222222222222): 000000000000000
Route number 3 (should show 333333333333333): 000000000000000
Simulation run #1
Route number 0 (should show 000000000000000): 111111111111111
Route number 1 (should show 111111111111111): 111111111111111
Route number 2 (should show 222222222222222): 111111111111111
Route number 3 (should show 333333333333333): 111111111111111
Simulation run #2
Route number 0 (should show 000000000000000): 222222222222222
Route number 1 (should show 111111111111111): 222222222222222
Route number 2 (should show 222222222222222): 222222222222222
Route number 3 (should show 333333333333333): 222222222222222
routesVector.at(i)->datayou have a typo ofiin place ofjhere ...