1

TL;DR

I need a piece of code to multi-thread properly and not return or crash on me. What do I need to do to consume any return or try/catch blocks such that main() does not end?

The story

I have been using cpp with Google protobufs for a short while now and I have run into an issue that is not solved by my attempts of a solution. I started learning server-client socket programs with google protobufs and cpp from this link. I first converted the .proto file to proto3 by editing the .proto file. I was then able to create a CMake file to compile the program.

My issue

I need to make a new client program such that, when the server.cpp is canceled with Ctrl+c, the client program does NOT crash, quit, seg fault, etc. Basically, when the server dies, the client should wait until the server comes back online.

What have I tried?

I have tried 2 things:

  1. I put the entire socket creation, including variable creation, into a massive while loop. I then had a delay at the beginning such that, if the connection could not be made, the program waits and then tries again.
  2. Multithreading with boost

I have tried to use boost::thread and boost::future to create a boost::async. I had a loop in main() that would create a vector (of length 1) of boost::async() objects with the entire rest of the program called in a method passed into the boost::async() constructor. I then simply called the boost::wait_for_all() on the iterator with begin() and end(). This is where I got the futures idea. Here is an example of the syntax:

while (true)
{
    printingmutex.lock();
    cout << "------------------\nStarting initialization of socket.\n" << endl;
    printingmutex.unlock();
    // Now I create the threads.
    std::vector<boost::future<void>> futures;
    futures.push_back(boost::async([host_name]{ doSocketswag(host_name); }));
    boost::wait_for_all(futures.begin(), futures.end());
    // Delay function after disconnection, implementation irrelevant to the question
    sleepApp(2000); // 2 seconds
}

Just for brevity, here is the new .proto file for conversion to proto3:

syntax = "proto3";
message log_packet {
  fixed64 log_time = 1;
  fixed32 log_micro_sec = 2;
  fixed32 sequence_no = 3;
  fixed32 shm_app_id = 4;
  string packet_id = 5;
  string log_level= 6;
  string log_msg= 7;
}

Why is this important?

This is important because, when I have this issue solved, I will be porting the client program to other platforms besides my Ubuntu box, but I will still be using my Ubuntu box, and I need to be able to run a program that operates outside of the sending of messages. I will also be able to add a GUI to the program, so I will need to be able to split the socket thread from the rest of the program, and therefore prevent bad connections from killing the client program.

In conclusion

How do I make the client program multithreaded such that the socket communication does not

  1. kill the program
  2. return up through main()
  3. kill only the socket thread, and nothing else?

A basic, no frills implementation that shows the sockets running separate from the main program will suffice.

Also, to speed up a response, I have a CMake file to compile the client and server and .proto file with. I am sharing it so you guys can spend more time on actually answering the question than dinking around with compiling code.

# Compiling the program, assuming all of the source files are in the same directory:
# mkdir build
# cd build
# cmake ..
# cmake --build . -- -j8 -l8

set(PROJECT_NAME CppProtobufdemo)
cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
project(${PROJECT_NAME} LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

add_definitions(-std=c++11)
# CMAKE_CURRENT_SOURCE_DIR - This is the directory where CMake is running.
set(CMAKE_BINARY_DIR ${CMAKE_SOURCE_DIR}/build)
set(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR})
set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/lib)

find_package( Boost REQUIRED system thread timer chrono)
if(Boost_FOUND)
    message("Boost was found.")
    message(${Boost_INCLUDE_DIR})
endif()

include_directories(${CMAKE_CURRENT_BINARY_DIR})

set(THE_USER $ENV{USER})
message("This is the com user_:" ${THE_USER})

set(PROGRAMS_TO_COMPILE
server
client
)

# These lines are for autogenerating code from the *.proto files with CMake.
find_package(Protobuf REQUIRED)
if(PROTOBUF_FOUND)
    message("Google Protobuf has been found.")
endif()
include_directories(${Protobuf_INCLUDE_DIRS})
include_directories(${CMAKE_CURRENT_BINARY_DIR})
# https://cmake.org/cmake/help/git-master/module/FindProtobuf.html
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS message.proto)
# I want to see if I can access these proto files from python scripts
protobuf_generate_python(PROTO_PY message.proto)

foreach(aprogram ${PROGRAMS_TO_COMPILE})
    add_executable(${aprogram} ${PROTO_SRCS} ${PROTO_HDRS} ${PROJECT_SOURCE_DIR}/${aprogram}.cpp )
    target_link_libraries(${aprogram} ${Boost_LIBRARIES})
    target_link_libraries(${aprogram} ${PROTOBUF_LIBRARIES})
endforeach()

Another thing: I have always gotten rid of the goto FINISH statements in the process.

If you have any comments or follow up questions, I am happy to respond.

Edit:

I have tried wrapping this line

boost::wait_for_all(futures.begin(), futures.end());

in a try catch like this:

try {
    // Now that I have created all of the threads, I need to wait for them to finish.
        boost::wait_for_all(futures.begin(), futures.end());
    }
    catch (const std::exception& e)
    {
    // Just keep swimming
    cout << "Thread died" << endl;
}

And now the error messages to not get pushed up. Instead, the program just quits, without an error or any message at all. I am using return statements to kill the threaded method, but they should not be killing the main() thread - unless there is something else I am missing. Here is the code for the client that I am using to demo this "fix":

// This is for the futures functionality
#define BOOST_THREAD_VERSION 4
//
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netdb.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
// cmake --build . -- -j8 -l8
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
//
#include <boost/thread.hpp>
#include <chrono>
#include <thread>

using namespace google::protobuf::io;
using namespace std;

void doSocketSwag(void);

void sleepApp(int millis)
{
    std::this_thread::sleep_for(std::chrono::milliseconds{millis});
}

// Boost sleep function
void wait(int milliseconds)
{
    boost::this_thread::sleep_for(boost::chrono::milliseconds{milliseconds});
}

int main(int argv, char** argc)
{
    // I will put this whole thing into a while loop
    while (true)
    {
        cout << "Creating threads." << endl;
        std::vector<boost::future<void>> futures;
        // The socket thread
        futures.push_back(boost::async([]{ doSocketSwag(); }));
        // Now I need to catch exceptions
        try {
            // Now that I have created all of the threads, I need to wait for them to finish.
            boost::wait_for_all(futures.begin(), futures.end());
        }
        catch (const std::exception& e)
        {
            // Just keep swimming
            cout << "Thread died" << endl;
        }
    }
    //delete pkt;
    //FINISH:
    //close(hsock);
    return 0;
}

//
void doSocketSwag(void)
{
    log_packet payload;
    payload.set_log_time(10);
    payload.set_log_micro_sec(10);
    payload.set_sequence_no(1);
    payload.set_shm_app_id(101);
    payload.set_packet_id("TST");
    payload.set_log_level("DEBUG");
    payload.set_log_msg("What shall we say then");
    //cout<<"size after serilizing is "<<payload.ByteSize()<<endl;
    int siz = payload.ByteSize()+4;
    char *pkt = new char [siz];
    google::protobuf::io::ArrayOutputStream aos(pkt,siz);
    CodedOutputStream *coded_output = new CodedOutputStream(&aos);
    coded_output->WriteVarint32(payload.ByteSize());
    payload.SerializeToCodedStream(coded_output);

    int host_port= 1101;
    char* host_name="127.0.0.1";

    struct sockaddr_in my_addr;

    char buffer[1024];
    int bytecount;
    int buffer_len=0;

    int hsock;
    int * p_int;
    int err;

    hsock = socket(AF_INET, SOCK_STREAM, 0);
    if(hsock == -1){
        printf("Error initializing socket %d\n",errno);
        //goto FINISH;
        return;
    }

    p_int = (int*)malloc(sizeof(int));
    *p_int = 1;

    if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 ) || (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
        printf("Error setting options %d\n",errno);
        free(p_int);
        //goto FINISH;
        return;
    }
    free(p_int);

    my_addr.sin_family = AF_INET ;
    my_addr.sin_port = htons(host_port);

    memset(&(my_addr.sin_zero), 0, 8);
    my_addr.sin_addr.s_addr = inet_addr(host_name);
    if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
        if((err = errno) != EINPROGRESS){
            fprintf(stderr, "Error connecting socket %d\n", errno);
            //goto FINISH;
            return;
        }
    }

    while (true)
    {
        if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 ) {
            // THIS is where the program dies.
            fprintf(stderr, "Error sending data %d\n", errno);
            //goto FINISH;
            break;
        }
        //printf("Sent bytes %d\n", bytecount);
        //usleep(1);
        sleepApp(1000);
        cout << "Thread slept for 1 second." << endl;
    }
}

If the return statements are that bad, how can I replace them?

11
  • It's unclear to me why would the client crash if the server goes offline? Does doSocketswag throw if the socket dies? If so, can't you catch it? Commented Jul 14, 2019 at 18:03
  • The client is in this code block: if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 ) { fprintf(stderr, "Error sending data %d\n", errno); goto FINISH; } and it tells me that it errors out, gives me an error number, and then quits the program. I should also say that the above pasted code block is in a while(true) loop, but that does not change the general logic. Also, in my personal code, I have a lot of other logic going on, but the original SO post linked above provides the same idea for proof of concept. I should also mention that I got rid of the goto FINISH in my original program. Commented Jul 14, 2019 at 21:46
  • I know where the code is failing. I do not know what to do to fix it. A debugger is not going to help me figure out what to replace code with @MaxVollmer. Commented Jul 15, 2019 at 0:07
  • Maybe I am too tired, but even after rereading your question I don't see any information on where the code is failing. Can you either add it or make it more clear? Commented Jul 15, 2019 at 0:18
  • Probably a combination of all of the above... I have been there too. Commented Jul 15, 2019 at 0:23

1 Answer 1

0

Here we go guys

After reviewing the link provided by Jeremy Friesner, I discovered that this issue is most definitely not original. It turns out that sockets are able to publish signals to interrupt a cpp program to require handling by the user. The link provided explains how that works. Basically, I simply had to handle an interrupt condition in order for the program to continue: the logic in the program was correct, but I did not know enough about sockets to know to ask about SIGPIPE and SIG_IGN to solve my problem. I have displayed below 2 examples of how to solve the problem, similar to Jeremy Friesner‘s link. The multi-threading also works properly now, and it is super awesome to watch. 😊

Here is the example where I directly copy the solution from the link:

// This is for the futures functionality
//#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_FUTURE
//
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netdb.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
// cmake --build . -- -j8 -l8
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
//
#include <boost/thread.hpp>
#include <chrono>
#include <thread>
#include <signal.h>

using namespace google::protobuf::io;
using namespace std;

void doSocketSwag(void);

void sleepApp(int millis)
{
    std::this_thread::sleep_for(std::chrono::milliseconds{millis});
}

// Boost sleep function
void wait(int milliseconds)
{
    boost::this_thread::sleep_for(boost::chrono::milliseconds{milliseconds});
}

log_packet payload;
int siz;
char *pkt;
google::protobuf::io::ArrayOutputStream * aos;
CodedOutputStream *coded_output;

int host_port= 1101;
char* host_name="127.0.0.1";

struct sockaddr_in my_addr;

char buffer[1024];
int bytecount;
int buffer_len=0;

int hsock;
int * p_int;
int err;

// This boolean controls whether of not the socket thread loops.
std::atomic_bool dosockets{true};

// This is the mutex for printing
std::mutex printlock;

// Try to do client socket program without server.
int main(int argv, char** argc)
{
    // Registering the singal PIPE ignore
    signal(SIGPIPE, SIG_IGN);

    // We build the packet.
    payload.set_log_time(10);
    payload.set_log_micro_sec(10);
    payload.set_sequence_no(1);
    payload.set_shm_app_id(101);
    payload.set_packet_id("TST");
    payload.set_log_level("DEBUG");
    payload.set_log_msg("What shall we say then");

    // Now I fill in the socket fields
    siz = payload.ByteSize()+4;
    pkt = new char [siz];
    aos = new google::protobuf::io::ArrayOutputStream(pkt,siz);
    coded_output = new CodedOutputStream(aos);

    // Now we do methods on the objects
    coded_output->WriteVarint32(payload.ByteSize());
    payload.SerializeToCodedStream(coded_output);

    // I will put this whole thing into a while loop
    while (true)
    {
        printlock.lock();
        cout << "Creating threads." << endl;
        printlock.unlock();
        std::vector<boost::future<void>> futures;
        // The socket thread
        futures.push_back(boost::async([]{ doSocketSwag(); }));
        boost::wait_for_all(futures.begin(), futures.end());
    }
    //delete pkt;
    //FINISH:
    //close(hsock);
    return 0;
}

//
void doSocketSwag(void)
{
    // 
    while (dosockets)
    {
        printlock.lock();
        cout << "Waiting to try again." << endl;
        printlock.unlock();
        sleepApp(1000);

        hsock = socket(AF_INET, SOCK_STREAM, 0);
        if(hsock == -1){
            printlock.lock();
            printf("Error initializing socket %d\n",errno);
            printlock.unlock();
            //goto FINISH;
            continue;
        }

        p_int = (int*)malloc(sizeof(int));
        *p_int = 1;

        if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 ) || (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
            printlock.lock();
            printf("Error setting options %d\n",errno);
            printlock.unlock();
            free(p_int);
            //goto FINISH;
            continue;
        }
        free(p_int);

        my_addr.sin_family = AF_INET ;
        my_addr.sin_port = htons(host_port);

        memset(&(my_addr.sin_zero), 0, 8);
        my_addr.sin_addr.s_addr = inet_addr(host_name);
        if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
            if((err = errno) != EINPROGRESS)
            {
                printlock.lock();
                fprintf(stderr, "Error connecting socket %d\n", errno);
                printlock.unlock();
                //goto FINISH;
                continue;
            }
        }

        while (true)
        {
            if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 )
            {
                printlock.lock();
                fprintf(stderr, "Error sending data %d\n", errno);
                printlock.unlock();
                //goto FINISH;
                //break;
                close(hsock);
                break;
            }
            //printf("Sent bytes %d\n", bytecount);
            //usleep(1);
            sleepApp(1000);
            printlock.lock();
            cout << "Thread slept for 1 second." << endl;
            printlock.unlock();
        }
    }
}

And then here is the example where I create a callback function when the SIGPIPE signal gets raised:

// This is for the futures functionality
//#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_FUTURE
//
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netdb.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
// cmake --build . -- -j8 -l8
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
//
#include <boost/thread.hpp>
#include <chrono>
#include <thread>
#include <signal.h>

using namespace google::protobuf::io;
using namespace std;

void doSocketSwag(void);

void sleepApp(int millis)
{
    std::this_thread::sleep_for(std::chrono::milliseconds{millis});
}

// Boost sleep function
void wait(int milliseconds)
{
    boost::this_thread::sleep_for(boost::chrono::milliseconds{milliseconds});
}

log_packet payload;
int siz;
char *pkt;
google::protobuf::io::ArrayOutputStream * aos;
CodedOutputStream *coded_output;

int host_port= 1101;
char* host_name="127.0.0.1";

struct sockaddr_in my_addr;

char buffer[1024];
int bytecount;
int buffer_len=0;

int hsock;
int * p_int;
int err;

// This boolean controls whether of not the socket thread loops.
std::atomic_bool dosockets{true};

// This is the mutex for printing
std::mutex printlock;

// The signal pipe error
void my_handler(int input)
{
    // Simply print
    printlock.lock();
    cout << "I got into the pipe handler LOOK AT ME WOOO HOOO." << endl;
    printlock.unlock();
}

// Try to do client socket program without server.
int main(int argv, char** argc)
{
    // Registering the singal PIPE ignore
    struct sigaction sigIntHandler;
    sigIntHandler.sa_handler = my_handler;
    sigemptyset(&sigIntHandler.sa_mask);
    sigIntHandler.sa_flags = 0;
    sigaction(SIGPIPE, &sigIntHandler, NULL);

    // We build the packet.
    payload.set_log_time(10);
    payload.set_log_micro_sec(10);
    payload.set_sequence_no(1);
    payload.set_shm_app_id(101);
    payload.set_packet_id("TST");
    payload.set_log_level("DEBUG");
    payload.set_log_msg("What shall we say then");

    // Now I fill in the socket fields
    siz = payload.ByteSize()+4;
    pkt = new char [siz];
    aos = new google::protobuf::io::ArrayOutputStream(pkt,siz);
    coded_output = new CodedOutputStream(aos);

    // Now we do methods on the objects
    coded_output->WriteVarint32(payload.ByteSize());
    payload.SerializeToCodedStream(coded_output);

    // I will put this whole thing into a while loop
    while (true)
    {
        printlock.lock();
        cout << "Creating threads." << endl;
        printlock.unlock();
        std::vector<boost::future<void>> futures;
        // The socket thread
        futures.push_back(boost::async([]{ doSocketSwag(); }));
        boost::wait_for_all(futures.begin(), futures.end());
    }
    //delete pkt;
    //FINISH:
    //close(hsock);
    return 0;
}

//
void doSocketSwag(void)
{
    // 
    while (dosockets)
    {
        printlock.lock();
        cout << "Waiting to try again." << endl;
        printlock.unlock();
        sleepApp(1000);

        hsock = socket(AF_INET, SOCK_STREAM, 0);
        if(hsock == -1){
            printlock.lock();
            printf("Error initializing socket %d\n",errno);
            printlock.unlock();
            //goto FINISH;
            continue;
        }

        p_int = (int*)malloc(sizeof(int));
        *p_int = 1;

        if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 ) || (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
            printlock.lock();
            printf("Error setting options %d\n",errno);
            printlock.unlock();
            free(p_int);
            //goto FINISH;
            continue;
        }
        free(p_int);

        my_addr.sin_family = AF_INET ;
        my_addr.sin_port = htons(host_port);

        memset(&(my_addr.sin_zero), 0, 8);
        my_addr.sin_addr.s_addr = inet_addr(host_name);
        if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
            if((err = errno) != EINPROGRESS)
            {
                printlock.lock();
                fprintf(stderr, "Error connecting socket %d\n", errno);
                printlock.unlock();
                //goto FINISH;
                continue;
            }
        }

        while (true)
        {
            if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 )
            {
                printlock.lock();
                fprintf(stderr, "Error sending data %d\n", errno);
                printlock.unlock();
                //goto FINISH;
                //break;
                close(hsock);
                break;
            }
            //printf("Sent bytes %d\n", bytecount);
            //usleep(1);
            sleepApp(1000);
            printlock.lock();
            cout << "Thread slept for 1 second." << endl;
            printlock.unlock();
        }
    }
}

The server code remains unchanged from the original link provided in the beginning of the question.

Thank you guys for the comments. They directed me to my answer and a whole new area of cpp I did not know what a thing: signals.

(If you guys want to continue commenting, that is fine as well. I am considering this question as answered.)

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.