Boost::asio - how to interrupt a blocked tcp server thread?
Asked Answered
P

5

29

I'm working on a multithreaded application in which one thread acts as a tcp server which receives commands from a client. The thread uses a Boost socket and acceptor to wait for a client to connect, receives a command from the client, passes the command to the rest of the application, then waits again. Here's the code:

void ServerThreadFunc()
{
    using boost::asio::ip::tcp;
    boost::asio::io_service io_service;
    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), port_no));

    for (;;)
    {
        //  listen for command connection
        tcp::socket socket(io_service);
        acceptor.accept(socket);

        //  connected; receive command
        boost::array<char,256> msg_buf;
        socket.receive(boost::asio::buffer(msg_buf));

        //  do something with received bytes here
    }
}

This thread spends most of its time blocked on the call to acceptor.accept(). At the moment, the thread only gets terminated when the application exits. Unfortunately, this causes a crash after main() returns - I believe because the thread tries to access the app's logging singleton after the singleton has been destroyed. (It was like that when I got here, honest guv.)

How can I shut this thread down cleanly when it's time for the application to exit? I've read that a blocking accept() call on a raw socket can be interrupted by closing the socket from another thread, but this doesn't appear to work on a Boost socket. I've tried converting the server logic to asynchronous i/o using the Boost asynchronous tcp echo server example, but that just seems to exchange a blocking call to acceptor::accept() for a blocking call to io_service::run(), so I'm left with the same problem: a blocked call which I can't interrupt. Any ideas?

Priebe answered 25/6, 2012 at 14:15 Comment(3)
There is always io_service::stop, it can be called from another thread.Leupold
The whole idea about having an event loop is that everything happens in the event loop, i.e. run(). Which means you don't have a second thread to worry about in the first place. (though sometimes it makes sense to do certain stuff outside the event loop, like demanding computations)Terrilyn
@AmbrozBizjak, I agree. Using old-school socket handling with select(), you can do everything necessary in one loop. select() will set a listening TCP socket as ready to read when the number of completed connections is non-zero. So, in this main select loop, you can handle all accepts, reads, and writes in one place. Richard Stevens' book: "Unix NW programming, Volume 1" talks about this.Granlund
K
32

In short, there are two options:

  • Change code to be asynchronous (acceptor::async_accept() and async_read), run within the event loop via io_service::run(), and cancel via io_service::stop().
  • Force blocking calls to interrupt with lower level mechanics, such as signals.

I would recommend the first option, as it is more likely to be the portable and easier to maintain. The important concept to understand is that the io_service::run() only blocks as long as there is pending work. When io_service::stop() is invoked, it will try to cause all threads blocked on io_service::run() to return as soon as possible; it will not interrupt synchronous operations, such as acceptor::accept() and socket::receive(), even if the synchronous operations are invoked within the event loop. It is important to note that io_service::stop() is a non-blocking call, so synchronization with threads that were blocked on io_service::run() must use another mechanic, such as thread::join().

Here is an example that will run for 10 seconds and listens to port 8080:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <iostream>

void StartAccept( boost::asio::ip::tcp::acceptor& );

void ServerThreadFunc( boost::asio::io_service& io_service )
{
  using boost::asio::ip::tcp;
  tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), 8080 ) );

  // Add a job to start accepting connections.
  StartAccept( acceptor );

  // Process event loop.
  io_service.run();

  std::cout << "Server thread exiting." << std::endl;
}

void HandleAccept( const boost::system::error_code& error,
                   boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
                   boost::asio::ip::tcp::acceptor& acceptor )
{
  // If there was an error, then do not add any more jobs to the service.
  if ( error )
  {
    std::cout << "Error accepting connection: " << error.message() 
              << std::endl;
    return;
  }

  // Otherwise, the socket is good to use.
  std::cout << "Doing things with socket..." << std::endl;

  // Perform async operations on the socket.

  // Done using the socket, so start accepting another connection.  This
  // will add a job to the service, preventing io_service::run() from
  // returning.
  std::cout << "Done using socket, ready for another connection." 
            << std::endl;
  StartAccept( acceptor );
};

void StartAccept( boost::asio::ip::tcp::acceptor& acceptor )
{
  using boost::asio::ip::tcp;
  boost::shared_ptr< tcp::socket > socket(
                                new tcp::socket( acceptor.get_io_service() ) );

  // Add an accept call to the service.  This will prevent io_service::run()
  // from returning.
  std::cout << "Waiting on connection" << std::endl;
  acceptor.async_accept( *socket,
    boost::bind( HandleAccept,
      boost::asio::placeholders::error,
      socket,
      boost::ref( acceptor ) ) );
}

int main()
{
  using boost::asio::ip::tcp;

  // Create io service.
  boost::asio::io_service io_service;

  // Create server thread that will start accepting connections.
  boost::thread server_thread( ServerThreadFunc, boost::ref( io_service ) );

  // Sleep for 10 seconds, then shutdown the server.
  std::cout << "Stopping service in 10 seconds..." << std::endl;
  boost::this_thread::sleep( boost::posix_time::seconds( 10 ) );
  std::cout << "Stopping service now!" << std::endl;

  // Stopping the io_service is a non-blocking call.  The threads that are
  // blocked on io_service::run() will try to return as soon as possible, but
  // they may still be in the middle of a handler.  Thus, perform a join on 
  // the server thread to guarantee a block occurs.
  io_service.stop();

  std::cout << "Waiting on server thread..." << std::endl;
  server_thread.join();
  std::cout << "Done waiting on server thread." << std::endl;

  return 0;
}

While running, I opened two connections. Here is the output:

Stopping service in 10 seconds...
Waiting on connection
Doing things with socket...
Done using socket, ready for another connection.
Waiting on connection
Doing things with socket...
Done using socket, ready for another connection.
Waiting on connection
Stopping service now!
Waiting on server thread...
Server thread exiting.
Done waiting on server thread.
Kandi answered 25/6, 2012 at 18:35 Comment(3)
Brilliant - exactly the code and explanation I needed. Thanks!Priebe
you deserve +100 rep :-)Lowndes
I'd add to this the option of connecting to the server from within the server to complete the accept: an example using this approach in a completely synchronous Asio serverAdmittance
C
4

When you receive an event that it's time to exit, you can call acceptor.cancel(), which will cancel the pending accept (with an error code of operation_canceled). On some systems, you might also have to close() the acceptor as well to be safe.

Carcajou answered 25/6, 2012 at 15:51 Comment(1)
Neither acceptor::cancel() nor acceptor::close() will interrupt synchronous operations. Those calls will only cause outstanding asynchronous operations to be ready to run within the service event loop with an error of operation_canceled.Kandi
P
4

If it comes to it, you could open a temporary client connection to it on localhost - that will wake it up. You could even send it a special message so that you can shut down your server from the pub - there should be an app for that:)

Paratuberculosis answered 25/6, 2012 at 16:4 Comment(1)
this was actually the only way I could exit a server thread blocking on acceptor.accept(); to have the server send a message to itself , that made accept() to returnFreaky
D
1

Simply call shutdown with native handle and the SHUT_RD option, to cancel the existing receive(accept) operation.

Dibbrun answered 30/12, 2016 at 18:53 Comment(0)
C
-1

The accepted answer is not exactly correct. Infact @JohnYu answered correctly.

Using blocking API of ASIO is much like using BSD sockets API that ASIO library wraps in its classes.

Problem is boost::asio::ip::tcp::acceptor class does not provide shutdown() functionality so you must do it using "old" sockets API.

Additional note: Make sure acceptor, socket and io_service are not deleted before all threads using it exit. In following code std::shared_ptr is used to keep shared resources alive so user of ApplicationContext class can delete the ApplicationContext object and avoid SEGFAULT crash.

Additional note: pay attention to boost documentation, there are overloaded methods that raise exception and ones that return error code. Original Poster's code used acceptor->accept(socket); without try/catch which would cause program exit instead of normal thread-routine exit and cleanup.

Here is the solution description:

#include <unistd.h> // include ::shutdown() function
// other includes ...

using boost::asio::ip::tcp;
using boost::asio::io_service;

class ApplicationContext {

    // Use shared pointer to extend life of resources afer ApplicationContext is deleted
    // and running threads can still keep using shared resources
    std::shared_ptr<tcp::acceptor> acceptor;
    std::shared_ptr<io_service> ioservice;

    // called `ServerThreadFunc` in question code example
    void AcceptLoopThreadRoutine(int port_no) {
        ioservice = std::make_shared<io_service>();
        acceptor = std::make_shared<tcp::acceptor>(*ioservice, tcp::endpoint(tcp::v4(), port_no));

        try {
            for (;;) {
                // listen for client connection
                tcp::socket socket(*ioservice);
                // Note boost::system::system_error is raised when using this overload
                acceptor->accept(socket);

                // connected receive some data ...
                // // boost::array<char,256> msg_buf;
                // // socket.receive(boost::asio::buffer(msg_buf));
                //  do something with received bytes here
            }
        } catch(std::exception const & exception) {
            // boost::system::system_error here indicates clean exit ;)
        }
    }

    void StopAcceptThread() {
        if(acceptor) {
            // boost::asio::ip::tcp::acceptor does not have shutdown() functionality
            // exposed, so we need to do it with this low-level approach
            int shutdown_status = shutdown(acceptor->native_handle(), SHUT_RDWR);
        }
    }

};

Also note that using signals to unblock accept thread is very nasty implementation and temporary client connection on localhost to unblock accept thread is very awkward.

The ASIO is here to help you accomplish everything in single thread with callbacks. If you are mixing threads and ASIO chances are your design is bad.

Additional note: Do not confuse shutdown() and close(). Some systems may allow you to use close() on accept socket to unblock accept loop but this is not portable.

Condescension answered 13/6, 2020 at 7:50 Comment(3)
AS in ASIO is for asynchronous. It is not limited to callbacks. In fact, callbacks are just one of the available ways for continuation: using different CompletionToken's you can get std::future or use coroutines for asynchronous programming. Callable objects themselves can be bound to executors which might invoke provided callbacks concurrently in a different thread. So, you are very wrong about about bad design.Southeast
@SergeyKolesnik you are talking about abstractions not relevant to the question. My answer is limited to the scope of the question. The question has concrete code requiring specific solution that is provided in the answer. The advice is sound and correct in this context.Condescension
Firstly, it is your opinion-based generalization regarding design with asio that is really misleading and need not to be related to the question. Secondly, regarding your code: use of shared_ptr is completely unjustified and you do not show how and at which circumstances you would share ownership of the resources. Shared acceptor is not thread safe according to documentation.Southeast

© 2022 - 2024 — McMap. All rights reserved.