I am working on an implementation of a concurrent_queue<T> and pf a ThreadPool, see beneath. The objectives are CPU performance and simplicity. I would be glad to have critiques, comments, opinions, suggestions.
The context : I am multithreading a C++ code implementing the Monte-Carlo method : inside a loop are performed very intensive (for the CPU and the RAM) computations and the loop is totally parallelizable. In the client code I intend to clone mutable objects living in the main thread and use them in the spawned threads, which results theoretically in a thread safe code without needing locks.
Here is the implementation :
The concurrent queue :
#ifndef CONCURRENT_QUEUE_H
#define CONCURRENT_QUEUE_H
#include <queue>
#include <mutex>
template <class T>
class concurrent_queue
{
std::queue<T> queue_;
mutable std::mutex mutex_;
std::condition_variable cv_;
bool interrupt_;
public:
concurrent_queue() : interrupt_(false) {}
~concurrent_queue()
{
interrupt();
}
bool empty() const
{
// Lock
std::lock_guard<std::mutex> lok(mutex_);
// Access underlying queue
return queue_.empty();
} // Unlock
// Pop into argument
bool try_pop(T& t)
{
// Lock
std::lock_guard<std::mutex> lok(mutex_);
if (queue_.empty()) return false;
// Move from queue
t = move(queue_.front());
// Combine front/pop
queue_.pop();
return true;
} // Unlock
// Pass t byVal or move with push( move( t))
void push(T t)
{
{
// Lock
std::lock_guard<std::mutex> lk(mutex_);
// Move into queue
queue_.push(move(t));
} // Unlock before notification
// Unlock before notification
cv_.notify_one();
}
// Wait if empty
bool pop(T& t)
{
// (Unique) lock
std::unique_lock<std::mutex> lk(mutex_);
// Wait if empty, release lock until notified
while (!interrupt_ && queue_.empty())
cv_.wait(lk);
// Re-acquire lock, resume
// Check for interruption
if (interrupt_)
return false;
// Combine front/pop
t = move(queue_.front());
queue_.pop();
return true;
}// Unlock
void interrupt()
{
{
std::lock_guard<std::mutex> lk(mutex_);
interrupt_ = true;
}
cv_.notify_all();
}
void reset_interrupt()
{
interrupt_ = false;
}
void clear()
{
std::queue<T> empty;
swap(queue_, empty);
}
};
#endif//CONCURRENT_QUEUE_H
The thread pool header :
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <future>
#include <thread>
#include "concurrent_queue.h"
#include <chrono>
using namespace std::chrono_literals;
typedef std::packaged_task<bool(void)> Task;
typedef std::future<bool> TaskHandle;
class ThreadPool
{
// The one and only instance
static ThreadPool instance_;
// The task queue
concurrent_queue<Task> concurrent_queue_;
// The threads
std::vector<std::thread> threads_;
// Active indicator
bool is_active_;
// Interruption indicator
bool interrupt_;
// Thread number
static thread_local size_t my_tls_num_;
// The function that is executed on every thread
void threadFunc(const size_t num)
{
my_tls_num_ = num;
Task t;
// "Infinite" loop, only broken on destruction
while (!interrupt_)
{
// Pop and executes tasks
concurrent_queue_.pop(t);
if (!interrupt_) t();
}
}
// The constructor stays private, ensuring single instance
ThreadPool() : is_active_(false), interrupt_(false) {}
public:
// Access the instance
static ThreadPool* getInstance() { return &instance_; }
// Number of threads
size_t numThreads() const { return threads_.size(); }
// The number of the caller thread
static size_t threadNum() { return my_tls_num_; }
// Starter
void start(const size_t nThread = std::thread::hardware_concurrency() - 1)
{
if (!is_active_) // Only start once
{
threads_.reserve(nThread);
// Launch threads on threadFunc and keep handles in a vector
for (size_t i = 0; i < nThread; i++)
threads_.push_back(std::thread(&ThreadPool::threadFunc, this, i + 1));
is_active_ = true;
}
}
//dtor
~ThreadPool()
{
stop();
}
void stop()
{
if (is_active_)
{
// Interrupt mode
interrupt_ = true;
// Interrupt all waiting threads
concurrent_queue_.interrupt();
// Wait for them all to join
for_each(threads_.begin(), threads_.end(), std::mem_fn(&std::thread::join));
// Clear all threads
threads_.clear();
// Clear the queue and reset interrupt
concurrent_queue_.clear();
concurrent_queue_.reset_interrupt();
// Mark as inactive
is_active_ = false;
// Reset interrupt
interrupt_ = false;
}
}
// Forbid copies etc
ThreadPool(const ThreadPool& rhs) = delete;
ThreadPool& operator=(const ThreadPool& rhs) = delete;
ThreadPool(ThreadPool&& rhs) = delete;
ThreadPool& operator=(ThreadPool&& rhs) = delete;
// Spawn task
template<typename Callable>
TaskHandle spawnTask(Callable c)
{
Task t(std::move(c));
TaskHandle f = t.get_future();
concurrent_queue_.push(std::move(t));
return f;
}
// Run queued tasks synchronously
// while waiting on a future,
// returns true if at least one task was run
bool activeWait(const TaskHandle& f)
{
Task t;
bool b = false;
// Check whether or not the future is ready without blocking
// by waiting 0 seconds and returning status
while (f.wait_for(0s) != std::future_status::ready)
{
// Non blocking
if (concurrent_queue_.try_pop(t))
{
t();
b = true;
}
else // Nothing in the queue: go to sleep
{
f.wait();
}
}
return b;
}
};
#endif//THREAD_POOL_H
and its source file :
#include "ThreadPool.h"
// Statics
ThreadPool ThreadPool::instance_;
thread_local size_t ThreadPool::my_tls_num_ = 0;