UPDATE
class buff {
public:
using value_type = std::array<uint8_t, 1024>;
value_type::iterator begin() { return data_.begin(); }
value_type::iterator end() { return data_.begin(); }
value_type& data() { return data_; }
size_t& capacity() { return capacity_; }
void clear() { for (auto& x : data_) x = 0; capacity_ = 0; }
bool busy() { return busy_; }
void set_busy(bool f) { busy_ = f; }
private:
bool busy_ = false;
size_t capacity_ = 0;
value_type data_{};
};
class Buffer_provider {
private:
std::mutex sync_;
public:
using pointer = boost::shared_ptr<Buffer_provider>;
buff* take()
{
sync_.lock();
auto it = std::find_if(
buffers_.begin(),
buffers_.end(),
[](buff& buff)->bool { return !buff.busy(); }
);
if (it != buffers_.end()) {
it->set_busy(true);
}
if (it == buffers_.end()) {
throw std::runtime_error("Uuuuppss....!!!!");
}
sync_.unlock();
return it;
}
void release(buff* buff)
{
sync_.lock();
buff->clear();
buff->set_busy(false);
sync_.unlock();
}
private:
std::array<buff, 4> buffers_;
};
class connection : public boost::enable_shared_from_this<connection> {
public:
using value_type = boost::shared_ptr<connection>;
using socket_type = boost::asio::ip::tcp::socket;
connection(boost::asio::io_context& io_context, Buffer_provider::pointer buffers)
: socket_(io_context), buffers_(buffers) { }
~connection() { std::cout << "Total: " << total << std::endl; }
boost::asio::ip::tcp::socket& socket() { return socket_; }
int total = 0;
void wait_for_data()
{
socket_.async_wait(
boost::asio::ip::tcp::socket::wait_read,
boost::bind(
&connection::read_header,
shared_from_this()
)
);
}
void read_header()
{
auto buffer = buffers_->take();
boost::asio::async_read(
socket_,
boost::asio::buffer(buffer->data(), Header::size),
boost::bind(
&connection::read_header_done,
shared_from_this(),
buffer,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
);
}
void read_header_done(buff* buffer, const boost::system::error_code& ec, const size_t& bytes_transfered)
{
if (ec) {
buffers_->release(buffer);
return;
}
auto buffer_data_it = buffer->begin();
Header header;
header.read(buffer_data_it, buffer->capacity());
buffers_->release(buffer);
boost::asio::post(boost::bind(&connection::wait_for_data, shared_from_this()));
}
private:
boost::asio::ip::tcp::socket socket_;
boost::shared_ptr<Buffer_provider> buffers_;
};
Let's assume that I have e.g. 1000 active connections, and 4 thread workers which process all operations on io_context.
In this particular case, when the socket is ready to read from it, connection object takes one buffer from pool, and then try to async_read some kinds of data that is coming from the socket.
I think that this may cause situations when the buffer is already taken from pool, but using async_read will causes that this opeartion will be queued, buffer will be locked, and the thread worker will take next waiting read operation on the other socket, and as above - the next buffer will be taken.
It may lead to exceed all available amount of buffers, because as the doc say: "Regardless of whether the asynchronous operation completes immediately or not, the handler will not be invoked from within this function. Invocation of the handler will be performed in a manner equivalent to using boost::asio::io_service::post(). "
Am I correct?
How to avoid this type of situations?
What the socket::async_wait is useful for?
Should I use sync_read from socket in the async_wait completition handler?