Using boost::asio::io_service::post()
Asked Answered
R

2

4

First i asked this Running a function on the main thread from a boost thread and passing parameters to that function

so now i am trying this:

The following is a console c++ project where i perfectly simulated my big project

TestServicePost.cpp

#include "stdafx.h"
#include "SomeClass.h"


int _tmain(int argc, _TCHAR* argv[])
{
    SomeClass* s = new SomeClass();
    while(true)
    {
        s->update();
    }
    return 0;
}

SomeClass.h

#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include <queue>

class ServiceNote
{
public:
    std::string getType()
    {
        std::stringstream typeSS;
        typeSS << "LamasaTech.MultiWall.PostNote." << (NoteType.compare("Normal") == 0 ? "Node" : "Header") << "." << Shape << "." << Colour;
        return typeSS.str();
    }
    int Action; 
    int CNoteId;    
    std::string Colour; 
    int NoteId; 
    std::string NoteType;   
    int SessionId;  
    std::string Shape;  
    std::string Style;  
    std::string Text;   
    int X;  
    int Y;  
};

class SomeClass
{
public:
    SomeClass();
    ~SomeClass();
    void update();

private:
    std::queue<ServiceNote> pendingNotes;
    void addToQueue(ServiceNote sn);
    void pollService(boost::asio::io_service* svc);
    int getMessage(boost::asio::io_service* svc, std::string sessionId, int messageId);
    boost::thread servicePoller;
};

SomeClass.cpp

#include "stdafx.h"
#include "SomeClass.h"
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/asio/signal_set.hpp>

#define POLL_SERVICE = 0;
#define POLLING_WAIT_TIME 1000
#define SAVE_SESSION_EVERY 1800000

SomeClass::SomeClass()
{
    boost::asio::io_service io_servicePoller;
    io_servicePoller.run();
    servicePoller = boost::thread(boost::bind(&SomeClass::pollService, this, &io_servicePoller));
    /*boost::asio::io_service io_sessionSaver;
    boost::asio::signal_set signalsSaver(io_sessionSaver, SIGINT, SIGTERM);
    signalsSaver.async_wait( boost::bind(&boost::asio::io_service::stop, &io_sessionSaver));
    sessionSaver = boost::thread(&SomeClass::saveSessionEvery, io_sessionSaver);*/
}

SomeClass::~SomeClass()
{
}

void SomeClass::update()
{   
    while(!pendingNotes.empty())
    {
        ServiceNote sn = pendingNotes.front();

        pendingNotes.pop();
    }
}

void SomeClass::addToQueue(ServiceNote sn)
{
    pendingNotes.push(sn);
}

void SomeClass::pollService(boost::asio::io_service* svc)
{
    int messageId = 1;
    while(true)
    {
        if(boost::this_thread::interruption_enabled() && boost::this_thread::interruption_requested())
            return;
        int currentId = messageId;
        messageId = getMessage(svc, "49", messageId);
        if(currentId == messageId)
            boost::this_thread::sleep(boost::posix_time::milliseconds(POLLING_WAIT_TIME));
    }
}

int SomeClass::getMessage(boost::asio::io_service* svc, std::string sessionId, int messageId)
{
    try
    {
        boost::asio::io_service io_service;

        // Get a list of endpoints corresponding to the server name.
        boost::asio::ip::tcp::resolver resolver(io_service);
        boost::asio::ip::tcp::resolver::query query("mw.rombus.com", "http");
        boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);

        // Try each endpoint until we successfully establish a connection.
        boost::asio::ip::tcp::socket socket(io_service);
        boost::asio::connect(socket, endpoint_iterator);

        // Form the request. We specify the "Connection: close" header so that the
        // server will close the socket after transmitting the response. This will
        // allow us to treat all data up until the EOF as the content.
        boost::asio::streambuf request;
        std::ostream request_stream(&request);
        request_stream << "GET " "/Service.svc/message/" << sessionId << "/" << messageId << " HTTP/1.0\r\n";
        request_stream << "Host: " << "mw.rombus.com" << "\r\n";
        request_stream << "Accept: */*\r\n";
        request_stream << "Connection: close\r\n\r\n";

        // Send the request.
        boost::asio::write(socket, request);

        // Read the response status line. The response streambuf will automatically
        // grow to accommodate the entire line. The growth may be limited by passing
        // a maximum size to the streambuf constructor.
        boost::asio::streambuf response;
        boost::asio::read_until(socket, response, "\r\n");

        // Check that response is OK.
        std::istream response_stream(&response);
        std::string http_version;
        response_stream >> http_version;
        unsigned int status_code;
        response_stream >> status_code;
        std::string status_message;
        std::getline(response_stream, status_message);
        if (!response_stream || http_version.substr(0, 5) != "HTTP/")
        {
            //std::cout << "Invalid response\n";
            return messageId;
        }
        if (status_code != 200)
        {
            //std::cout << "Response returned with status code " << status_code << "\n";
            return messageId;
        }

        // Read the response headers, which are terminated by a blank line.
        boost::asio::read_until(socket, response, "\r\n\r\n");

        // Process the response headers.
        std::string header;
        std::string fullHeader = "";
        while (std::getline(response_stream, header) && header != "\r")
            fullHeader.append(header).append("\n");

        // Write whatever content we already have to output.
        std::string fullResponse = "";
        if (response.size() > 0)
        {
            std::stringstream ss;
            ss << &response;
            fullResponse = ss.str();
            try
            {
                boost::property_tree::ptree pt;
                boost::property_tree::read_json(ss, pt);
                ServiceNote sn;
                sn.Action =  pt.get<int>("Action");
                sn.CNoteId =  pt.get<int>("CNoteId");
                sn.Colour =  pt.get<std::string>("Colour");
                sn.NoteId =  pt.get<int>("NoteId");
                sn.NoteType =  pt.get<std::string>("NoteType");
                sn.SessionId =  pt.get<int>("SessionId");
                sn.Shape =  pt.get<std::string>("Shape");
                sn.Style =  pt.get<std::string>("Style");
                sn.Text =  pt.get<std::string>("Text");
                sn.X =  pt.get<int>("X");
                sn.Y =  pt.get<int>("Y");
                svc->post(boost::bind(&SomeClass::addToQueue, this, sn));
                //pendingNotes.push(sn);
            }
            catch (std::exception const& e)
            {
                std::string test = e.what();
                //std::cerr << e.what() << std::endl;
            }
            messageId++;
        }

        // Read until EOF, writing data to output as we go.
        std::string fullSth = "";
        boost::system::error_code error;
        while (boost::asio::read(socket, response,
                boost::asio::transfer_at_least(1), error))
        {
            std::ostringstream ss;
            ss << &response;
            fullSth = ss.str();
        }
        if (error != boost::asio::error::eof)
            throw boost::system::system_error(error);
    }
    catch (std::exception& e)
    {
        std::string test = e.what();
        std::cout << "Exception: " << e.what() << "\n";
    }
    return messageId;
}

but i get Unhandled exception at 0x771215de in TestServicePost.exe: 0xC0000005: Access violation writing location 0xcccccce4., right after this line executes:

svc->post(boost::bind(&SomeClass::addToQueue, this, sn));

I couldn't define io_service as a class member so i can use it in the destructor ~SomeClass(), would appreciate help on that too

If io_service.post is not the best solution for me please recommend something, as you can see i have a constructor, destructor and an update method who is called every tick, i tried using this and the queue alone but it wasn't thread safe, is there an easy thread safe FIFO to use ?

Retaretable answered 9/12, 2012 at 7:49 Comment(0)
R
2

I figured out how to declare io_service as a class member:

boost::shared_ptr< boost::asio::io_service > io_servicePoller;

and in the constructor i did the following:

SomeClass::SomeClass()
{
    boost::shared_ptr< boost::asio::io_service > io_service(
        new boost::asio::io_service
    );
    io_servicePoller = io_service;
    servicePoller = boost::thread(boost::bind(&SomeClass::pollService, this, io_servicePoller));
}

Some cleanup

SomeClass::~SomeClass()
{
    servicePoller.interrupt();
    io_servicePoller->stop();
    servicePoller.join();
}

and in update i called run which adds the stuff into the queue, then reads them in the while loop

void SomeClass::update()
{   
    io_servicePoller->run();
    io_servicePoller->reset();
    while(!pendingNotes.empty())
    {
        ServiceNote sn = pendingNotes.front();

        pendingNotes.pop();
    }
}

and changed my members signature to void SomeClass::pollService(boost::shared_ptr< boost::asio::io_service > svc)

So what happens is:

  1. The app starts
  2. inits my class
  3. my class makes a service and starts the thread
  4. the thread fetches items from the service
  5. the main thread checks the io service queue and exuted it
  6. then it uses the queue

Thanks to Igor R. i couldn't have done it without him

and also http://www.gamedev.net/blog/950/entry-2249317-a-guide-to-getting-started-with-boostasio?pg=4 where i got how to make the shared pointer

Retaretable answered 9/12, 2012 at 9:39 Comment(2)
But that won't work as well. If you insist to use io_service::run() this way (i.e. to process only the items currently available in io_service queue and exit), you must call io_servicePoller->reset() prior to it -- otherwise run would work only for the 1st time.Abracadabra
But i added io_servicePoller->reset(); after io_servicePoller->run(); anyways because i respect you and i am sure you are more experienced then meRetaretable
A
4

In SomeClass constructor you actually do the following:

  1. Define a local io_service instance.
  2. Call its run() member-function, which returns immediately, because io_service has no work.
  3. Pass an address of the local object to another thread.

This certainly won't work.

Note that io_service::run() is a kind of "message loop", so it should block the calling thread. Don't call it in object constructor.

Abracadabra answered 9/12, 2012 at 9:2 Comment(3)
Thanks, I fixed it i will post my solution now :)Retaretable
@Shereef make another member-function, give io_service some work, and call run as the last statement (it will block): void run() { io_service io_servicePoller; io_service::work work(io_servicePoller); servicePoller = thread(bind(&SomeClass::pollService, this, &io_servicePoller)); io_servicePoller.run(); }Abracadabra
I don't understand what you wrote, but i posted the solution i implementedRetaretable
R
2

I figured out how to declare io_service as a class member:

boost::shared_ptr< boost::asio::io_service > io_servicePoller;

and in the constructor i did the following:

SomeClass::SomeClass()
{
    boost::shared_ptr< boost::asio::io_service > io_service(
        new boost::asio::io_service
    );
    io_servicePoller = io_service;
    servicePoller = boost::thread(boost::bind(&SomeClass::pollService, this, io_servicePoller));
}

Some cleanup

SomeClass::~SomeClass()
{
    servicePoller.interrupt();
    io_servicePoller->stop();
    servicePoller.join();
}

and in update i called run which adds the stuff into the queue, then reads them in the while loop

void SomeClass::update()
{   
    io_servicePoller->run();
    io_servicePoller->reset();
    while(!pendingNotes.empty())
    {
        ServiceNote sn = pendingNotes.front();

        pendingNotes.pop();
    }
}

and changed my members signature to void SomeClass::pollService(boost::shared_ptr< boost::asio::io_service > svc)

So what happens is:

  1. The app starts
  2. inits my class
  3. my class makes a service and starts the thread
  4. the thread fetches items from the service
  5. the main thread checks the io service queue and exuted it
  6. then it uses the queue

Thanks to Igor R. i couldn't have done it without him

and also http://www.gamedev.net/blog/950/entry-2249317-a-guide-to-getting-started-with-boostasio?pg=4 where i got how to make the shared pointer

Retaretable answered 9/12, 2012 at 9:39 Comment(2)
But that won't work as well. If you insist to use io_service::run() this way (i.e. to process only the items currently available in io_service queue and exit), you must call io_servicePoller->reset() prior to it -- otherwise run would work only for the 1st time.Abracadabra
But i added io_servicePoller->reset(); after io_servicePoller->run(); anyways because i respect you and i am sure you are more experienced then meRetaretable

© 2022 - 2024 — McMap. All rights reserved.