Working with boost::asio::streambuf
Asked Answered
H

1

13

Looking for a boost::asio (and with himself boost) decided to write asynchronous server. To store incoming data I use boost::asio::streambuf. Here I have a problem. When I receive a second message from the client and subsequent I see that in the buffer contains a data from previous messages. Although I call Consume method at the input buffer. What's wrong?

class tcp_connection
// Using shared_ptr and enable_shared_from_this 
// because we want to keep the tcp_connection object alive 
// as long as there is an operation that refers to it.
: public boost::enable_shared_from_this<tcp_connection>
{
...

boost::asio::streambuf receive_buffer;

boost::asio::io_service::strand strand;
}

...

void tcp_connection::receive()
{
// Read the response status line. The response_ streambuf will
// automatically grow to accommodate the entire line. The growth may be
// limited by passing a maximum size to the streambuf constructor.
boost::asio::async_read_until(m_socket, receive_buffer, "\r\n",
    strand.wrap(boost::bind(&tcp_connection::handle_receive, shared_from_this()/*this*/,
    boost::asio::placeholders::error,
    boost::asio::placeholders::bytes_transferred)));

}


void tcp_connection::handle_receive(const boost::system::error_code& error, 
std::size_t bytes_transferred)
{

if (!error)
{
    // process the data

    /*  boost::asio::async_read_until remarks

    After a successful async_read_until operation, 
    the streambuf may contain additional data beyond the delimiter.
    An application will typically leave that data in the streambuf for a
    subsequent async_read_until operation to examine.
    */

    /* didn't work      
    std::istream is(&receive_buffer);
    std::string line;
    std::getline(is, line); 
    */


    // clean up incomming buffer but it didn't work 
    receive_buffer.consume(bytes_transferred);  

    receive(); 

}
else if (error != boost::asio::error::operation_aborted)
{
    std::cout << "Client Disconnected\n";

    m_connection_manager.remove(shared_from_this());
}
}
Habitforming answered 12/2, 2015 at 13:7 Comment(0)
P
37

Either using a std::istream and reading from it, such as by std::getline(), or explicitly invoking boost::asio::streambuf::consume(n), will remove data from the input sequence.
If the application is performing either of these and subsequent read_until() operations results in duplicated data in receive_buffer's input sequence, then the duplicated data is likely originating from the remote peer. If the remote peer is writing to the socket and directly using a streambuf's input sequence, then the remote peer needs to explicitly invoke consume() after each successful write operation.


As noted in the documentation, successful read_until() operations may contain additional data beyond the delimiter, including additional delimiters. For instance, if "a@b@" is written to a socket, a read_until() operation using '@' as a delimiter may read and commit "a@b@" to the streambuf's input sequence. However, the operation will indicate that the amount of bytes transferred is that up to and including the first delimiter. Thus, bytes_transferred would be 2 and streambuf.size() would be 4. After 2 bytes have been consumed, the streambuf's input sequence would contain "b@", and a subsequent call to read_until() will return immediately, as the streambuf already contains the delimiter.

Here is a complete example demonstrating streambuf usage for reading and writing, and how the input sequence is consumed:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>

// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}

std::string make_string(boost::asio::streambuf& streambuf)
{
 return {buffers_begin(streambuf.data()), 
         buffers_end(streambuf.data())};
}

int main()
{
  using boost::asio::ip::tcp;
  boost::asio::io_service io_service;

  // Create all I/O objects.
  tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
  tcp::socket server_socket(io_service);
  tcp::socket client_socket(io_service);

  // Connect client and server sockets.
  acceptor.async_accept(server_socket, boost::bind(&noop));
  client_socket.async_connect(acceptor.local_endpoint(), boost::bind(&noop));
  io_service.run();

  // Write to server.
  boost::asio::streambuf write_buffer;
  std::ostream output(&write_buffer);
  output << "a@"
            "b@";
  write(server_socket, write_buffer.data());
  std::cout << "Wrote: " << make_string(write_buffer) << std::endl;
  assert(write_buffer.size() == 4);  // Data not consumed.

  // Read from the client.
  boost::asio::streambuf read_buffer;

  // Demonstrate consuming via istream.
  {
    std::cout << "Read" << std::endl;
    auto bytes_transferred = read_until(client_socket, read_buffer, '@');
    // Verify that the entire write_buffer (data pass the first delimiter) was
    // read into read_buffer.
    auto initial_size = read_buffer.size();
    assert(initial_size == write_buffer.size());

    // Read from the streambuf.
    std::cout << "Read buffer contains: " << make_string(read_buffer)
              << std::endl;
    std::istream input(&read_buffer);
    std::string line;
    getline(input, line, '@'); // Consumes from the streambuf.
    assert("a" == line); // Note getline discards delimiter.
    std::cout << "Read consumed: " << line << "@" << std::endl;
    assert(read_buffer.size() == initial_size - bytes_transferred);
  }

  // Write an additional message to the server, but only consume 'a@'
  // from write buffer.  The buffer will contain 'b@c@'.
  write_buffer.consume(2);
  std::cout << "Consumed write buffer, it now contains: " <<
                make_string(write_buffer) << std::endl;
  assert(write_buffer.size() == 2);
  output << "c@";
  assert(write_buffer.size() == 4);
  write(server_socket, write_buffer.data());
  std::cout << "Wrote: " << make_string(write_buffer) << std::endl;

  // Demonstrate explicitly consuming via the streambuf.
  {
    std::cout << "Read" << std::endl;
    auto initial_size = read_buffer.size();
    auto bytes_transferred = read_until(client_socket, read_buffer, '@');
    // Verify that the read operation did not attempt to read data from
    // the socket, as the streambuf already contained the delimiter.
    assert(initial_size == read_buffer.size());

    // Read from the streambuf.
    std::cout << "Read buffer contains: " << make_string(read_buffer)
              << std::endl;
    std::string line(
        boost::asio::buffers_begin(read_buffer.data()),
        boost::asio::buffers_begin(read_buffer.data()) + bytes_transferred);
    assert("b@" == line);
    assert(read_buffer.size() == initial_size); // Nothing consumed.
    read_buffer.consume(bytes_transferred); // Explicitly consume.
    std::cout << "Read consumed: " << line << std::endl;
    assert(read_buffer.size() == 0);
  }

  // Read again.
  {
    std::cout << "Read" << std::endl;
    read_until(client_socket, read_buffer, '@');

    // Read from the streambuf.
    std::cout << "Read buffer contains: " << make_string(read_buffer)
              << std::endl;
    std::istream input(&read_buffer);
    std::string line;
    getline(input, line, '@'); // Consumes from the streambuf.
    assert("b" == line); // Note "b" is expected and not "c".
    std::cout << "Read consumed: " << line << "@" << std::endl;
    std::cout << "Read buffer contains: " << make_string(read_buffer)
              << std::endl;
  }
}

Output:

Wrote: a@b@
Read
Read buffer contains: a@b@
Read consumed: a@
Consumed write buffer, it now contains: b@
Wrote: b@c@
Read
Read buffer contains: b@
Read consumed: b@
Read
Read buffer contains: b@c@
Read consumed: b@
Read buffer contains: c@
Podite answered 13/2, 2015 at 16:29 Comment(6)
Thank you, this is a very good example of working with streambuf. I found why I see data from previous messages after sending. On the client, function Send() uses a global buffer streambuf (send_buffer in my example). How do I do that every call of Send() used own streambuf object ? Something like boost::make_shared? This is the right approach? Do you have a good example of server and client using boost::asio? That's what in the documentation of Boost d oes not suit me, there used buffers of constant length, and I would like to see an example of using streambuf.Habitforming
Awesome, very helpful answer. I wish the Boost people would have you write all their documentation! ;-)Landbert
How did you access the input sequence without ever commiting (commit())? data() documentation says data() returns list of input buffers, and according to commit() documentation, it actually copies the characters from output sequence to input sequence.Fournier
@Cengiz When using Asio operations that accept the streambuf (not a buffer representing on of its sequences), or when using stream objects that use a streambuf, such as std::ostream, the underlying input and output sequences will be properly managed. This answer may provide more details. Also, the documentation for commit() mentions moving, not copying, data. The current implementation uses a single contiguous array, where a commit() advances both the output sequence begin pointer and input sequence end pointer by n characters.Podite
@Tanner, thanks for the quick answer. Since the question is a bit old, I expected to wait a little longer.Fournier
@Tanner If you are still around. Please see: #46981427Psalms

© 2022 - 2024 — McMap. All rights reserved.