I am new to corotine and asio. I am working on a local process management tool where a client is responsible for launching a program. This client sends commands to a backend daemon that manages the process. I am using a single-threaded, multi-coroutine model. My daemon needs to interact with the child process to obtain logs for IPC transmission to the client.
My question is: why does my daemon get stuck at code 1 async_read_some and not trigger reading stdout, while code 2 works fine?
Here is a simple demo:
client code:
#include <iostream>
#include <pwd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <thread>
#include <unistd.h>
void receive_output(int sockfd)
{
char buffer[1024];
while (true)
{
ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
if (n <= 0)
break;
buffer[n] = '\0';
std::cout << buffer << std::flush;
}
}
void send_input(int sockfd)
{
std::string input;
while (std::getline(std::cin, input))
{
input += '\n';
send(sockfd, input.c_str(), input.size(), 0);
}
}
int main()
{
int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
struct sockaddr_un addr;
addr.sun_family = AF_UNIX;
strcpy(addr.sun_path, "/tmp/socket");
connect(sockfd, (struct sockaddr *)&addr, sizeof(addr));
std::string command = "test";
send(sockfd, command.c_str(), command.size(), 0);
std::thread output_thread(receive_output, sockfd);
std::thread input_thread(send_input, sockfd);
output_thread.join();
input_thread.join();
close(sockfd);
return 0;
}
server code:
#include <boost/asio.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <cstdlib>
#include <fcntl.h>
#include <iostream>
#include <pty.h>
#include <pwd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>
using boost::asio::awaitable;
using boost::asio::use_awaitable;
namespace asio = boost::asio;
boost::asio::io_context io_context;
void set_nonblocking(int fd)
{
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1)
{
perror("fcntl(F_GETFL)");
return;
}
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) == -1)
{
perror("fcntl(F_SETFL)");
}
}
awaitable<std::string> receive_command(asio::posix::stream_descriptor &client_stream)
{
char buffer[256];
std::size_t len = co_await client_stream.async_read_some(boost::asio::buffer(buffer), use_awaitable);
buffer[len] = '\0';
co_return std::string(buffer);
}
awaitable<void> run_process_in_pty(const std::string &command, asio::posix::stream_descriptor &client_stream,
int client_fd)
{
int master_fd;
pid_t pid = forkpty(&master_fd, NULL, NULL, NULL);
// set_nonblocking(master_fd);
if (pid == 0)
{
// Child process: Set user and change directory
setvbuf(stdout, nullptr, _IONBF, 0);
// execute echo just for test
// execl(command.c_str(), "", NULL);
execl("/bin/echo", "echo", "Testing PTY output!", NULL);
std::cerr << "Failed to execute command: " << strerror(errno) << std::endl;
exit(EXIT_FAILURE);
}
else
{
// Parent process: Handle I/O between client and pty
asio::posix::stream_descriptor pty_stream(io_context, master_fd);
// handle output
co_spawn(
io_context,
[&client_stream, &pty_stream]() -> awaitable<void> {
char buffer[1024];
try
{
while (true)
{
// code 1
// std::size_t n = co_await pty_stream.async_read_some(boost::asio::buffer(buffer), use_awaitable);
// code 2
ssize_t n = read(pty_stream.native_handle(), buffer, sizeof(buffer));
std::cout << buffer << std::endl;
if (n == 0)
break; // End of stream
// co_await asio::async_write(
// client_stream, boost::asio::buffer(buffer, n),
// use_awaitable);
}
}
catch (std::exception &e)
{
std::cerr << strerror(errno) << std::endl;
}
co_return;
},
asio::detached);
// handle input
waitpid(pid, nullptr, 0);
co_return;
}
}
awaitable<void> accept_client(asio::posix::stream_descriptor client_stream, int client_fd)
{
std::string command = co_await receive_command(client_stream);
std::cout << "Received command: " << command << std::endl;
co_await run_process_in_pty(command, client_stream, client_fd);
client_stream.close();
}
awaitable<int> async_accept(int server_fd)
{
int client_fd;
co_await asio::post(io_context, use_awaitable);
client_fd = accept(server_fd, nullptr, nullptr);
if (client_fd < 0)
{
std::cerr << "Failed to accept client connection" << std::endl;
co_return - 1;
}
std::cout << "clientfd, " << client_fd << std::endl;
set_nonblocking(client_fd);
co_return client_fd;
}
awaitable<void> daemon_loop()
{
const char *socket_path = "/tmp/socket";
if (access(socket_path, F_OK) == 0)
{
if (unlink(socket_path) != 0)
{
std::cerr << "Failed to remove existing socket file" << std::endl;
exit(EXIT_FAILURE);
}
}
int server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (server_fd < 0)
{
std::cerr << "Failed to create socket" << std::endl;
exit(EXIT_FAILURE);
}
// set_nonblocking(server_fd);
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strcpy(addr.sun_path, socket_path);
if (bind(server_fd, (struct sockaddr *)&addr, sizeof(addr)) < 0)
{
std::cerr << "Failed to bind socket" << std::endl;
exit(EXIT_FAILURE);
}
if (listen(server_fd, 5) < 0)
{
std::cerr << "Failed to listen on socket" << std::endl;
exit(EXIT_FAILURE);
}
asio::posix::stream_descriptor server_stream(io_context, server_fd);
while (true)
{
int client_fd = co_await async_accept(server_fd);
asio::posix::stream_descriptor client_stream(io_context, client_fd);
std::cout << "Accepted connection" << std::endl;
co_spawn(io_context, accept_client(std::move(client_stream), client_fd), asio::detached);
}
}
int main()
{
std::cout << "start" << std::endl;
co_spawn(io_context, daemon_loop(), asio::detached);
io_context.run();
return 0;
}
Here is the different running result between code 1 and code 2
code 2:


