0

I am currently tasked with writing a generic thread pool, having multiple worker threads and one scheduling thread in c++. To support running any kind of function within one pool I used variadic functions and parameter packs, in which I have little experience. My thread pool and worker class looks like this

template<typename R, typename ...A>
class Worker {
  public:
    // starts thread that runs during lifetime of worker object
    Worker() : thread_([this] {
        while (!stopped) {
            // run task that worker has been set to and remove it thereafter
            if (task_ != NULL) {
                idle = false;
                task_(std::get<A>(args_)...);
                task_ = NULL;
            }
            idle = true;
            
        }
    }) { }

    ~Worker() {
        stop();
    }

    void stop() {
        stopped = true;
        thread_.join();
    }

    bool idling() {
        return idle;
    }

    
    void set_work(std::function<R(A...)> task, std::tuple<A...> args) {
        task_ = task;
        args_ = args;
    }

  private:
    std::thread thread_;
    std::function<R(A...)> task_;
    std::tuple<A...> args_;
    bool idle = false;
    bool stopped = false;
};

template<typename R, typename ...A>
class ThreadPool {
  public:
    // pool runs scheduling thread which assigns queued tasks to idling workers
    ThreadPool(size_t num_workers) : workers(num_workers), num_workers_(num_workers), runner([this, num_workers] {
        while(!stopped) {
            for (size_t i = 0; i < num_workers; i++) {
                if (workers[i].idling() && !q.empty()) {
                    workers[i].set_work(q.front().first, q.front().second);
                    q.pop();
                }
            }
        }
    }) { }

    void add_task(std::function<R(A...)> task, A... args) {
        q.push({task, std::make_tuple(args...)});
    }

    size_t tasks_left() {
        return q.size();
    }

    size_t workers_idling() {
        size_t n = 0;
        for (size_t i = 0; i < num_workers_; i++) {
            if (workers[i].idling()) n++;
        }
        return n;
    }

    void stop() {
        for (size_t i = 0; i < num_workers_; i++) {
            workers[i].stop();
        }
        stopped = true;
        runner.join();
    }

  private:
    std::vector<Worker<R, A...>> workers;
    std::queue<std::pair<std::function<R(A...)>, std::tuple<A...>>> q;
    std::thread runner;
    bool stopped = false;
    size_t num_workers_;
};

The first hurdle I encountered was that I was not able to use references as variadic types, so I used the whole object. But any class not specifying a default constructor, throws the following error https://pastebin.com/ye6enTD3. Accordingly for any other class which does, the member variables are not consistently the same as the object I passed to the worker.

I would appreciate your help on this topic.

4
  • First make a simple interface with only a virtual void operator() say task_itf, then you can have a queue of std::shared_ptr<task_itf> for scheduling. Create a task class template specializing on return type of your function only and put your function in a packaged_task (this even allows your threadpool to return futures) inside this task. Then you can have a template<typename fn_t> auto schedule(fn_t fn) -> std::future<decltype(fn())> as entry point. Commented Feb 15, 2023 at 13:01
  • 2
    This solution has a very significant problem, it uses zero synchronization. Even if it seems like it works, it might crash at any time or randomly not work as expected. You are not allowed to modify an object if that object could potentially be read from or written to by another thread. You need to add mechanisms to prevent all but 1 thread from accessing an object while it is being read from, or only use atomic operations (see std::atomic). Commented Feb 15, 2023 at 13:01
  • 2
    In addition to the compilation error the shown multithreading code is fundamentally flawed. None of the objects that get accessed by multiple execution threads are properly synchronized. This will end in tears. Perhaps, before unraveling variadic templates and their wonders, it will be more beneficial to focus on some multithreading basics? Commented Feb 15, 2023 at 13:14
  • Can you use std::ref as a reference as a variadic type? Or std::reference_wrapper? (I don't normally use either of those, so I'm not sure which is applicable for your use case.) Commented Feb 15, 2023 at 13:19

1 Answer 1

0

I would start out with something like this. Note stopping a threadpool that still has work needs design decissions. And yes you will need mutexes and condition variables to make everything work and synchronize. A good threadpool implementation is not trivial.

#include <future>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <iostream>

class task_itf
{
public:
    virtual void operator()() = 0;
};

template<typename retval_t>
struct task_t final :
    public task_itf
{
public:
    explicit task_t(std::function<retval_t()> fn) :
        m_task{ fn }
    {
    }

    void operator()() override
    {
        m_task();
    }

    auto future()
    {
        return m_task.get_future();
    }

private:
    std::packaged_task<retval_t ()> m_task;
};


class threadpool_t
{
public:
    threadpool_t() :
        m_running{ true }
    {
    }

    template<typename fn_t>
    auto schedule(fn_t fn) -> std::future<decltype(fn())>
    {
        using retval_t = decltype(fn());
        auto task = std::make_shared<task_t<retval_t>>(fn);
        
        {
            std::scoped_lock<std::mutex> lock{ m_mtx };
            m_queue.push(task);
        }
        m_queued.notify_one();
        return task->future();
    }

private:

    // .. todo let the threads pickup queue entries one by one.
    // if a thread is finished with a task and there are entries 
    // in the queue it can immediately pickup the next.
    // otherwise wait for signal on m_cv;

    std::mutex m_mtx;
    std::condition_variable m_queued;
    bool m_running;

    // shared_ptr, because we hand over task to another thread later
    std::queue<std::shared_ptr<task_itf>> m_queue;
};

int main()
{
    threadpool_t pool;
    pool.schedule([] {std::cout << "Hello world"; });
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.