Just keep accepting.
You show no code, but it typically looks like
void do_accept() {
acceptor_.async_accept(socket_, [this](boost::system::error_code ec) {
std::cout << "async_accept -> " << ec.message() << "\n";
if (!ec) {
std::make_shared<Connection>(std::move(socket_))->start();
do_accept(); // THIS LINE
}
});
}
If you don't include the line marked // THIS LINE
you will indeed not accept more than 1 connection.
If this doesn't help, please include some code we can work from.
For Fun, A Demo
This uses just standard library features for the non-network part.
Network Listener
The network part is as outlined before:
#include <boost/asio.hpp>
#include <boost/asio/high_resolution_timer.hpp>
#include <istream>
using namespace std::chrono_literals;
using Clock = std::chrono::high_resolution_clock;
namespace Shared {
using PostRequest = std::function<void(std::istream& is)>;
}
namespace Network {
namespace ba = boost::asio;
using ba::ip::tcp;
using error_code = boost::system::error_code;
using Shared::PostRequest;
struct Connection : std::enable_shared_from_this<Connection> {
Connection(tcp::socket&& s, PostRequest poster) : _s(std::move(s)), _poster(poster) {}
void process() {
auto self = shared_from_this();
ba::async_read(_s, _request, [this,self](error_code ec, size_t) {
if (!ec || ec == ba::error::eof) {
std::istream reader(&_request);
_poster(reader);
}
});
}
private:
tcp::socket _s;
ba::streambuf _request;
PostRequest _poster;
};
struct Server {
Server(unsigned port, PostRequest poster) : _port(port), _poster(poster) {}
void run_for(Clock::duration d = 30s) {
_stop.expires_from_now(d);
_stop.async_wait([this](error_code ec) { if (!ec) _svc.post([this] { _a.close(); }); });
_a.listen();
do_accept();
_svc.run();
}
private:
void do_accept() {
_a.async_accept(_s, [this](error_code ec) {
if (!ec) {
std::make_shared<Connection>(std::move(_s), _poster)->process();
do_accept();
}
});
}
unsigned short _port;
PostRequest _poster;
ba::io_service _svc;
ba::high_resolution_timer _stop { _svc };
tcp::acceptor _a { _svc, tcp::endpoint {{}, _port } };
tcp::socket _s { _svc };
};
}
The only "connection" to the work service part is the PostRequest
handler that is passed to the server at construction:
Network::Server server(6767, handler);
I've also opted for async operations, so we can have a timer to stop the service, even though we do not use any threads:
server.run_for(3s); // this blocks
The Work Part
This is completely separate, and will use threads. First, let's define a Request
, and a thread-safe Queue
:
namespace Service {
struct Request {
std::vector<char> data; // or whatever you read from the sockets...
};
Request parse_request(std::istream& is) {
Request result;
result.data.assign(std::istream_iterator<char>(is), {});
return result;
}
struct Queue {
Queue(size_t max = 50) : _max(max) {}
void enqueue(Request req) {
std::unique_lock<std::mutex> lk(mx);
cv.wait(lk, [this] { return _queue.size() < _max; });
_queue.push_back(std::move(req));
cv.notify_one();
}
Request dequeue(Clock::time_point deadline) {
Request req;
{
std::unique_lock<std::mutex> lk(mx);
_peak = std::max(_peak, _queue.size());
if (cv.wait_until(lk, deadline, [this] { return _queue.size() > 0; })) {
req = std::move(_queue.front());
_queue.pop_front();
cv.notify_one();
} else {
throw std::range_error("dequeue deadline");
}
}
return req;
}
size_t peak_depth() const {
std::lock_guard<std::mutex> lk(mx);
return _peak;
}
private:
mutable std::mutex mx;
mutable std::condition_variable cv;
size_t _max = 50;
size_t _peak = 0;
std::deque<Request> _queue;
};
This is nothing special, and doesn't actually use threads yet. Let's make a worker function that accepts a reference to a queue (more than 1 worker can be started if so desired):
void worker(std::string name, Queue& queue, Clock::duration d = 30s) {
auto const deadline = Clock::now() + d;
while(true) try {
auto r = queue.dequeue(deadline);
(std::cout << "Worker " << name << " handling request '").write(r.data.data(), r.data.size()) << "'\n";
}
catch(std::exception const& e) {
std::cout << "Worker " << name << " got " << e.what() << "\n";
break;
}
}
}
The main
Driver
Here's where the Queue gets instantiated and both the network server as well as some worker threads are started:
int main() {
Service::Queue queue;
auto handler = [&](std::istream& is) {
queue.enqueue(Service::parse_request(is));
};
Network::Server server(6767, handler);
std::vector<std::thread> pool;
pool.emplace_back([&queue] { Service::worker("one", queue, 6s); });
pool.emplace_back([&queue] { Service::worker("two", queue, 6s); });
server.run_for(3s); // this blocks
for (auto& thread : pool)
if (thread.joinable())
thread.join();
std::cout << "Maximum queue depth was " << queue.peak_depth() << "\n";
}
Live Demo
See It Live On Coliru
With a test load looking like this:
for a in "hello world" "the quick" "brown fox" "jumped over" "the pangram" "bye world"
do
netcat 127.0.0.1 6767 <<< "$a" || echo "not sent: '$a'"&
done
wait
It prints something like:
Worker one handling request 'brownfox'
Worker one handling request 'thepangram'
Worker one handling request 'jumpedover'
Worker two handling request 'Worker helloworldone handling request 'byeworld'
Worker one handling request 'thequick'
'
Worker one got dequeue deadline
Worker two got dequeue deadline
Maximum queue depth was 6