Boost asio, single TCP server, many clients
Asked Answered
H

2

3

I am creating a TCP server that will use boost asio which will accept connections from many clients, receive data, and send confirmations. The thing is that I want to be able to accept all the clients but I want to work only with one at a time. I want all the other transactions to be kept in a queue.

Example:

  1. Client1 connects
  2. Client2 connects
  3. Client1 sends data and asks for reply
  4. Client2 sends data and asks for reply
  5. Client2's request is put into queue
  6. Client1's data is read, server replies, end of transaction
  7. Client2's request is taken from the queue, server reads data, replies end of transaction.

So this is something between asynchronous server and blocking server. I want to do just 1 thing at once but at the same time I want to be able to store all client sockets and their demands in the queue.

I was able to create server-client communication with all the functionality that I need but only on single thread. Once client disconnects server is terminated as well. I don't really know how to start implementing what I have mentioned above. Should I open new thread each time connection is accepted? Should I use async_accept or blocking accept?

I have read boost::asio chat example, where many clients connect so single server, but there is no queuing mechanism that I need here.

I am aware that this post might be a bit confusing but TCP servers are new to me so I am not familiar enough with the terminology. There is also no source code to post because I am asking only for help with concept of this project.

Heel answered 25/10, 2017 at 14:25 Comment(1)
We can't tell you whether you /should/ use blocking or async accept. From the looks of it you can do either.Gurgitation
G
8

Just keep accepting.

You show no code, but it typically looks like

void do_accept() {
    acceptor_.async_accept(socket_, [this](boost::system::error_code ec) {
        std::cout << "async_accept -> " << ec.message() << "\n";
        if (!ec) {
            std::make_shared<Connection>(std::move(socket_))->start();
            do_accept(); // THIS LINE
        }
    });
}

If you don't include the line marked // THIS LINE you will indeed not accept more than 1 connection.

If this doesn't help, please include some code we can work from.

For Fun, A Demo

This uses just standard library features for the non-network part.

Network Listener

The network part is as outlined before:

#include <boost/asio.hpp>
#include <boost/asio/high_resolution_timer.hpp>
#include <istream>

using namespace std::chrono_literals;
using Clock = std::chrono::high_resolution_clock;

namespace Shared {
    using PostRequest = std::function<void(std::istream& is)>;
}

namespace Network {

    namespace ba = boost::asio;
    using ba::ip::tcp;
    using error_code = boost::system::error_code;

    using Shared::PostRequest;

    struct Connection : std::enable_shared_from_this<Connection> {
        Connection(tcp::socket&& s, PostRequest poster) : _s(std::move(s)), _poster(poster) {}

        void process() {
            auto self = shared_from_this();
            ba::async_read(_s, _request, [this,self](error_code ec, size_t) {
                if (!ec || ec == ba::error::eof) {
                    std::istream reader(&_request);
                    _poster(reader);
                }
            });
        }

      private:
        tcp::socket   _s;
        ba::streambuf _request;
        PostRequest   _poster;
    };

    struct Server {

        Server(unsigned port, PostRequest poster) : _port(port), _poster(poster) {}

        void run_for(Clock::duration d = 30s) {
            _stop.expires_from_now(d);
            _stop.async_wait([this](error_code ec) { if (!ec) _svc.post([this] { _a.close(); }); });

            _a.listen();

            do_accept();

            _svc.run();
        }
      private:
        void do_accept() {
            _a.async_accept(_s, [this](error_code ec) {
                if (!ec) {
                    std::make_shared<Connection>(std::move(_s), _poster)->process();
                    do_accept();
                }
            });
        }

        unsigned short            _port;
        PostRequest               _poster;

        ba::io_service            _svc;
        ba::high_resolution_timer _stop { _svc };
        tcp::acceptor             _a { _svc, tcp::endpoint {{}, _port } };
        tcp::socket               _s { _svc };
    };
}

The only "connection" to the work service part is the PostRequest handler that is passed to the server at construction:

Network::Server server(6767, handler);

I've also opted for async operations, so we can have a timer to stop the service, even though we do not use any threads:

server.run_for(3s); // this blocks

The Work Part

This is completely separate, and will use threads. First, let's define a Request, and a thread-safe Queue:

namespace Service {
    struct Request {
        std::vector<char> data; // or whatever you read from the sockets...
    };

    Request parse_request(std::istream& is) {
        Request result;
        result.data.assign(std::istream_iterator<char>(is), {});
        return result;
    }

    struct Queue {
        Queue(size_t max = 50) : _max(max) {}

        void enqueue(Request req) {
            std::unique_lock<std::mutex> lk(mx);
            cv.wait(lk, [this] { return _queue.size() < _max; });
            _queue.push_back(std::move(req));

            cv.notify_one();
        }

        Request dequeue(Clock::time_point deadline) {
            Request req;

            {
                std::unique_lock<std::mutex> lk(mx);
                _peak = std::max(_peak, _queue.size());
                if (cv.wait_until(lk, deadline, [this] { return _queue.size() > 0; })) {
                    req = std::move(_queue.front());
                    _queue.pop_front();
                    cv.notify_one();
                } else {
                    throw std::range_error("dequeue deadline");
                }
            }

            return  req;
        }

        size_t peak_depth() const {
            std::lock_guard<std::mutex> lk(mx);
            return _peak;
        }

      private:
        mutable std::mutex mx;
        mutable std::condition_variable cv;

        size_t _max = 50;
        size_t _peak = 0;
        std::deque<Request> _queue;
    };

This is nothing special, and doesn't actually use threads yet. Let's make a worker function that accepts a reference to a queue (more than 1 worker can be started if so desired):

    void worker(std::string name, Queue& queue, Clock::duration d = 30s) {
        auto const deadline = Clock::now() + d;

        while(true) try {
            auto r = queue.dequeue(deadline);
            (std::cout << "Worker " << name << " handling request '").write(r.data.data(), r.data.size()) << "'\n";
        }
        catch(std::exception const& e) {
            std::cout << "Worker " << name << " got " << e.what() << "\n";
            break;
        }
    }
}

The main Driver

Here's where the Queue gets instantiated and both the network server as well as some worker threads are started:

int main() {
    Service::Queue queue;

    auto handler = [&](std::istream& is) {
            queue.enqueue(Service::parse_request(is));
        };

    Network::Server server(6767, handler);

    std::vector<std::thread> pool;
    pool.emplace_back([&queue] { Service::worker("one", queue, 6s); });
    pool.emplace_back([&queue] { Service::worker("two", queue, 6s); });

    server.run_for(3s); // this blocks

    for (auto& thread : pool)
        if (thread.joinable())
            thread.join();

    std::cout << "Maximum queue depth was " << queue.peak_depth() << "\n";
}

Live Demo

See It Live On Coliru

With a test load looking like this:

for a in "hello world" "the quick" "brown fox" "jumped over" "the pangram" "bye world"
do
     netcat 127.0.0.1 6767 <<< "$a" || echo "not sent: '$a'"&
done
wait

It prints something like:

Worker one handling request 'brownfox'
Worker one handling request 'thepangram'
Worker one handling request 'jumpedover'
Worker two handling request 'Worker helloworldone handling request 'byeworld'
Worker one handling request 'thequick'
'
Worker one got dequeue deadline
Worker two got dequeue deadline
Maximum queue depth was 6
Gurgitation answered 25/10, 2017 at 14:30 Comment(1)
This is the correct answer to at least 30% of all asio questions :)Grimbal
Z
0

The includes you need. Some maybe are unnecessary:

boost/asio.hpp, boost/thread.hpp, boost/asio/io_service.hpp

boost/asio/spawn.hpp, boost/asio/write.hpp, boost/asio/buffer.hpp

boost/asio/ip/tcp.hpp, iostream, stdlib.h, array, string

vector, string.h, stdio.h, process.h, iterator

using namespace boost::asio;
using namespace boost::asio::ip;

io_service ioservice;

tcp::endpoint sim_endpoint{ tcp::v4(), 4066 };              //{which connectiontype, portnumber}
tcp::acceptor sim_acceptor{ ioservice, sim_endpoint };
std::vector<tcp::socket> sim_sockets;

static int iErgebnis;
int iSocket = 0;


void do_write(int a)                                        //int a is the postion of the socket in the vector
{
    int iWSchleife = 1;                                     //to stay connected with putty or something
    static char chData[32000];
    std::string sBuf = "Received!\r\n";

    while (iWSchleife > 0)          
    {
        boost::system::error_code error;
        memset(chData, 0, sizeof(chData));        //clear the char 

        iErgebnis = sim_sockets[a].read_some(boost::asio::buffer(chData), error);           //recv data from client
        iWSchleife = iErgebnis;                                                             //if iErgebnis is bigger then 0 it will stay in the loop. iErgebniss is always >0 when data is received

        if (iErgebnis > 0) {
            printf("%d data received from client : \n%s\n\n", iErgebnis, chData);
            write(sim_sockets[a], boost::asio::buffer(sBuf), error);  //send data to client
        }
        else {
            boost::system::error_code ec;
            sim_sockets[a].shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);       //close the socket when no data
            if (ec)
            {
                printf("studown error");                                                    // An error occurred.
            }
        }
    }
}

void do_accept(yield_context yield)
{
    while (1)                                                   //endless loop to accept limitless clients
    {
        sim_sockets.emplace_back(ioservice);                    //look to the link below for more info
        sim_acceptor.async_accept(sim_sockets.back(), yield);   //waits here to accept an client

        boost::thread dosome(do_write, iSocket);                //when accepted, starts the thread do_write and passes the parameter iSocket
        iSocket++;                                              //to know the position of the socket in the vector

    }
}

int main()
{
    sim_acceptor.listen();
    spawn(ioservice, do_accept);            //here you can learn more about Coroutines https://theboostcpplibraries.com/boost.coroutine
    ioservice.run();                        //from here you jump to do:accept
    getchar(); 
}
Zora answered 17/4, 2018 at 6:25 Comment(2)
Shouldn't we somehow manage the lifetime of the thread? Is this method really safe?Heel
if you close the connection the while loop will be exit and then the thread ends.Zora

© 2022 - 2024 — McMap. All rights reserved.