3

I am new to corotine and asio. I am working on a local process management tool where a client is responsible for launching a program. This client sends commands to a backend daemon that manages the process. I am using a single-threaded, multi-coroutine model. My daemon needs to interact with the child process to obtain logs for IPC transmission to the client.

My question is: why does my daemon get stuck at code 1 async_read_some and not trigger reading stdout, while code 2 works fine?

Here is a simple demo:

client code:

#include <iostream>
#include <pwd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <thread>
#include <unistd.h>

void receive_output(int sockfd)
{
    char buffer[1024];
    while (true)
    {
        ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
        if (n <= 0)
            break;
        buffer[n] = '\0';
        std::cout << buffer << std::flush;
    }
}

void send_input(int sockfd)
{
    std::string input;
    while (std::getline(std::cin, input))
    {
        input += '\n';
        send(sockfd, input.c_str(), input.size(), 0);
    }
}

int main()
{
    int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
    struct sockaddr_un addr;
    addr.sun_family = AF_UNIX;
    strcpy(addr.sun_path, "/tmp/socket");

    connect(sockfd, (struct sockaddr *)&addr, sizeof(addr));

    std::string command = "test";
    send(sockfd, command.c_str(), command.size(), 0);

    std::thread output_thread(receive_output, sockfd);
    std::thread input_thread(send_input, sockfd);

    output_thread.join();
    input_thread.join();

    close(sockfd);
    return 0;
}

server code:

#include <boost/asio.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <cstdlib>
#include <fcntl.h>
#include <iostream>
#include <pty.h>
#include <pwd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>

using boost::asio::awaitable;
using boost::asio::use_awaitable;
namespace asio = boost::asio;

boost::asio::io_context io_context;

void set_nonblocking(int fd)
{
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1)
    {
        perror("fcntl(F_GETFL)");
        return;
    }
    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) == -1)
    {
        perror("fcntl(F_SETFL)");
    }
}

awaitable<std::string> receive_command(asio::posix::stream_descriptor &client_stream)
{
    char buffer[256];
    std::size_t len = co_await client_stream.async_read_some(boost::asio::buffer(buffer), use_awaitable);
    buffer[len] = '\0';
    co_return std::string(buffer);
}

awaitable<void> run_process_in_pty(const std::string &command, asio::posix::stream_descriptor &client_stream,
                                   int client_fd)
{
    int master_fd;
    pid_t pid = forkpty(&master_fd, NULL, NULL, NULL);
    //   set_nonblocking(master_fd);
    if (pid == 0)
    {
        // Child process: Set user and change directory
        setvbuf(stdout, nullptr, _IONBF, 0);

        // execute echo just for test
        // execl(command.c_str(), "", NULL);
        execl("/bin/echo", "echo", "Testing PTY output!", NULL);

        std::cerr << "Failed to execute command: " << strerror(errno) << std::endl;
        exit(EXIT_FAILURE);
    }
    else
    {
        // Parent process: Handle I/O between client and pty
        asio::posix::stream_descriptor pty_stream(io_context, master_fd);

        // handle output
        co_spawn(
            io_context,
            [&client_stream, &pty_stream]() -> awaitable<void> {
                char buffer[1024];
                try
                {
                    while (true)
                    {
                        // code 1
                        // std::size_t n = co_await pty_stream.async_read_some(boost::asio::buffer(buffer), use_awaitable);

                        // code 2
                        ssize_t n = read(pty_stream.native_handle(), buffer, sizeof(buffer));

                        std::cout << buffer << std::endl;
                        if (n == 0)
                            break; // End of stream

                        //   co_await asio::async_write(
                        //   client_stream, boost::asio::buffer(buffer, n),
                        //   use_awaitable);
                    }
                }
                catch (std::exception &e)
                {
                    std::cerr << strerror(errno) << std::endl;
                }
                co_return;
            },
            asio::detached);

        // handle input



        waitpid(pid, nullptr, 0);

        co_return;
    }
}

awaitable<void> accept_client(asio::posix::stream_descriptor client_stream, int client_fd)
{
    std::string command = co_await receive_command(client_stream);
    std::cout << "Received command: " << command << std::endl;

    co_await run_process_in_pty(command, client_stream, client_fd);
    client_stream.close();
}

awaitable<int> async_accept(int server_fd)
{
    int client_fd;
    co_await asio::post(io_context, use_awaitable);
    client_fd = accept(server_fd, nullptr, nullptr);
    if (client_fd < 0)
    {
        std::cerr << "Failed to accept client connection" << std::endl;
        co_return - 1;
    }
    std::cout << "clientfd, " << client_fd << std::endl;
    set_nonblocking(client_fd);
    co_return client_fd;
}

awaitable<void> daemon_loop()
{
    const char *socket_path = "/tmp/socket";

    if (access(socket_path, F_OK) == 0)
    {
        if (unlink(socket_path) != 0)
        {
            std::cerr << "Failed to remove existing socket file" << std::endl;
            exit(EXIT_FAILURE);
        }
    }

    int server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
    if (server_fd < 0)
    {
        std::cerr << "Failed to create socket" << std::endl;
        exit(EXIT_FAILURE);
    }
    //   set_nonblocking(server_fd);

    struct sockaddr_un addr;
    memset(&addr, 0, sizeof(addr));
    addr.sun_family = AF_UNIX;
    strcpy(addr.sun_path, socket_path);
    if (bind(server_fd, (struct sockaddr *)&addr, sizeof(addr)) < 0)
    {
        std::cerr << "Failed to bind socket" << std::endl;
        exit(EXIT_FAILURE);
    }

    if (listen(server_fd, 5) < 0)
    {
        std::cerr << "Failed to listen on socket" << std::endl;
        exit(EXIT_FAILURE);
    }

    asio::posix::stream_descriptor server_stream(io_context, server_fd);

    while (true)
    {
        int client_fd = co_await async_accept(server_fd);
        asio::posix::stream_descriptor client_stream(io_context, client_fd);
        std::cout << "Accepted connection" << std::endl;
        co_spawn(io_context, accept_client(std::move(client_stream), client_fd), asio::detached);
    }
}

int main()
{
    std::cout << "start" << std::endl;
    co_spawn(io_context, daemon_loop(), asio::detached);
    io_context.run();
    return 0;
}

Here is the different running result between code 1 and code 2

code 1: enter image description here

code 2:

enter image description here

1 Answer 1

2

The most likely problem is your use of ::waitpid which blocks the only available IO service thread.

Your co_spawn in run_process_in_pty also plays fast and loose with object lifetimes. It captures client_stream and pty_stream by reference, but they are either locals or references from another coroutine frame.

Regardless, all the code just looks highly overcomplicated, somehow forcing C-style network code into Asio's coroutine framework. Why not use Asio? So you can get:

awaitable<void> client_connection(Socket client) {
    std::cout << "Accepted connection" << std::endl;
    std::string command = co_await receive_command(client);
    std::cout << "Received command: " << command << std::endl;

    co_await run_process_in_pty(command, client, client.native_handle());
    // client.close(); // implicit due to RAII lifetime
}

awaitable<void> daemon_loop() {
    path socket_path = "/tmp/socket";
    auto ex          = co_await asio::this_coro::executor;

    if (exists(socket_path)) {
        if (!is_socket(socket_path))
            throw std::runtime_error("File exists and is not a socket");
        if (!remove(socket_path))
            throw std::runtime_error("Failed to remove existing socket file");
    }

    UNIX::acceptor acc(ex, socket_path.native());
    acc.listen(5);

    while (true) {
        co_spawn(ex, client_connection(co_await acc.async_accept()), asio::detached);
    }
}

int main() {
    boost::asio::io_context io_context;

    std::cout << "start" << std::endl;
    co_spawn(io_context, daemon_loop(), asio::detached);
    io_context.run();
}

BONUS DEMO

As a bonus, let me also demonstrate how to use Boost Process for the child process instead:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/process/v2.hpp>
#include <filesystem>
#include <iostream>
#include <pty.h>
#include <sys/wait.h>

namespace asio = boost::asio;
namespace bp   = boost::process::v2;
using UNIX     = asio::local::stream_protocol;
using Socket   = UNIX::socket;
using std::filesystem::path;

asio::awaitable<std::string> receive_command(UNIX::socket& client_stream) {
#if 1
    std::string buf;
    co_await async_read_until(client_stream, asio::dynamic_buffer(buf), '\n', asio::deferred);
    while (buf.ends_with('\n'))
        buf.pop_back();
    co_return buf;
#else
    char buffer[512];
    auto n = co_await client_stream.async_read_some(asio::buffer(buffer), asio::deferred);
    co_return std::string(buffer, n);
#endif
}

asio::awaitable<void> run_process( //
    [[maybe_unused]] std::string const& command, Socket& client_stream) try {
    asio::any_io_executor ex = co_await asio::this_coro::executor;

    //bp::popen child(ex, "/bin/echo", {"Testing PTY output for command [", command, "]"});
    bp::popen child(ex, "/bin/sh", {"-c", command});

    // int client_fd = client_stream.native_handle();

    for (char buffer[1024]; ;) {
        // auto [ec, n] = co_await child.async_read_some(boost::asio::buffer(buffer),
        // asio::as_tuple(asio::deferred));
        auto n = co_await child.async_read_some(boost::asio::buffer(buffer));

        std::cout << quoted(std::string_view(buffer, n)) << std::endl;

        if (n)
            co_await async_write(client_stream, boost::asio::buffer(buffer, n), asio::deferred);

        // if (ec) {
        // std::cout << "End of stream (" << ec.message() << ")" << std::endl;
        // break;
        //}
    }
} catch (boost::system::system_error const& se) {
    std::cerr << "run_process: " << se.code().message() << std::endl;
} catch (std::exception const& e) {
    std::cerr << "run_process: " << e.what() << std::endl;
}

asio::awaitable<void> client_connection(Socket client) {
    std::cout << "Accepted connection" << std::endl;
    std::string command = co_await receive_command(client);
    std::cout << "Received command: " << command << std::endl;

    co_await run_process(command, client);
    client.close(); // implicit due to RAII lifetime
}

asio::awaitable<void> daemon_loop() {
    path socket_path = "/tmp/socket";
    auto ex          = co_await asio::this_coro::executor;

    if (exists(socket_path)) {
        if (!is_socket(socket_path))
            throw std::runtime_error("File exists and is not a socket");
        if (!remove(socket_path))
            throw std::runtime_error("Failed to remove existing socket file");
    }

    UNIX::acceptor acc(ex, socket_path.native());
    acc.listen(5);

    while (true) {
        co_spawn(ex, client_connection(co_await acc.async_accept()), asio::detached);
    }
}

int main() {
    boost::asio::io_context io_context;

    std::cout << "start" << std::endl;
    co_spawn(io_context, daemon_loop(), asio::detached);
    io_context.run();
}

With a local, more functional demonstration:

Sign up to request clarification or add additional context in comments.

1 Comment

Of course you can mix and match if you need the pty: coliru.stacked-crooked.com/a/c3ed7a9b6222c572

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.