c++ connect output stream to input stream
Asked Answered
P

3

20

What I would like to do is create a sort of "pipe" (like a pipe between processes), but between c++ iostreams within the same program. I have a function that requires an input stream as an argument, but my data is coming from an output stream. So is there a standard way to pipe the output of a std::ostream into the input of a std::istream?

Panelboard answered 13/9, 2012 at 16:42 Comment(2)
Does std::stringstream suit your need? If no, explain why.Arquit
There is an iostream (notice it has an i and an o at the beginning). You pump data in one end and it comes out there end. Is that what you want.Iliad
N
24

You can create a std::streambuf where the output goes to one buffer and std::overflow() blocks when the buffer becomes full. On the other end you'd have an input buffer which blocks on underflow() when the buffer becomes empty. Obviously, reading and writing would be in two different threads.

The tricky business is how to synchronize the two buffers: The streams don't use any synchronization operations while accessing the buffers. Only when any of the virtual functions is called you can intercept the operation and deal with the synchronization. On the other hand, not using a buffer is fairly inefficient. The way I would address this problem is by using a relatively small output buffer (e.g. 256 chars) and also override sync() to use this function for transfer of characters to the input buffer. The streambuf would use a mutex for the synchronization and a condition variable to block on a full input buffer on output and an empty input buffer on input. To support clean shutdown there should also be a function setting a flag that no more input is coming and all further output operations should fail.

Creating the actual implementation reveals that two buffers aren't sufficient: the threads accessing the input and the output buffer may be active when the respective other buffer blocks. Thus, a third, intermediate buffer is needed. With this small change to the above plan, below is some code (it uses tiny buffers to make sure there are actual overflows and underflows; for a real use at least the input buffer should probably be bigger).

// threadbuf.cpp                                                      -*-C++-*-
// ----------------------------------------------------------------------------
//  Copyright (C) 2013 Dietmar Kuehl http://www.dietmar-kuehl.de         
//                                                                       
//  Permission is hereby granted, free of charge, to any person          
//  obtaining a copy of this software and associated documentation       
//  files (the "Software"), to deal in the Software without restriction, 
//  including without limitation the rights to use, copy, modify,        
//  merge, publish, distribute, sublicense, and/or sell copies of        
//  the Software, and to permit persons to whom the Software is          
//  furnished to do so, subject to the following conditions:             
//                                                                       
//  The above copyright notice and this permission notice shall be       
//  included in all copies or substantial portions of the Software.      
//                                                                       
//  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,      
//  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES      
//  OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND             
//  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT          
//  HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,         
//  WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING         
//  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR        
//  OTHER DEALINGS IN THE SOFTWARE. 
// ----------------------------------------------------------------------------


#include <algorithm>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <stdexcept>
#include <streambuf>
#include <string>
#include <thread>

// ----------------------------------------------------------------------------

class threadbuf
    : public std::streambuf
{
private:
    typedef std::streambuf::traits_type traits_type;
    typedef std::string::size_type      string_size_t;

    std::mutex              d_mutex;
    std::condition_variable d_condition;
    std::string             d_out;
    std::string             d_in;
    std::string             d_tmp;
    char*                   d_current;
    bool                    d_closed;

public:
    threadbuf(string_size_t out_size = 16, string_size_t in_size = 64)
        : d_out(std::max(string_size_t(1), out_size), ' ')
        , d_in(std::max(string_size_t(1), in_size), ' ')
        , d_tmp(std::max(string_size_t(1), in_size), ' ')
        , d_current(&this->d_tmp[0])
        , d_closed(false)
    {
        this->setp(&this->d_out[0], &this->d_out[0] + this->d_out.size() - 1);
        this->setg(&this->d_in[0], &this->d_in[0], &this->d_in[0]);
    }
    void close()
    {
        {
            std::unique_lock<std::mutex> lock(this->d_mutex);
            this->d_closed = true;
            while (this->pbase() != this->pptr()) {
                this->internal_sync(lock);
            }
        }
        this->d_condition.notify_all();
    }

private:
    int_type underflow()
    {
        if (this->gptr() == this->egptr())
        {
            std::unique_lock<std::mutex> lock(this->d_mutex);
            while (&this->d_tmp[0] == this->d_current && !this->d_closed) {
                this->d_condition.wait(lock);
            }
            if (&this->d_tmp[0] != this->d_current) {
                std::streamsize size(this->d_current - &this->d_tmp[0]);
                traits_type::copy(this->eback(), &this->d_tmp[0],
                                  this->d_current - &this->d_tmp[0]);
                this->setg(this->eback(), this->eback(), this->eback() + size);
                this->d_current = &this->d_tmp[0];
                this->d_condition.notify_one();
            }
        }
        return this->gptr() == this->egptr()
            ? traits_type::eof()
            : traits_type::to_int_type(*this->gptr());
    }
    int_type overflow(int_type c)
    {
        std::unique_lock<std::mutex> lock(this->d_mutex);
        if (!traits_type::eq_int_type(c, traits_type::eof())) {
            *this->pptr() = traits_type::to_char_type(c);
            this->pbump(1);
        }
        return this->internal_sync(lock)
            ? traits_type::eof()
            : traits_type::not_eof(c);
    }
    int sync()
    {
        std::unique_lock<std::mutex> lock(this->d_mutex);
        return this->internal_sync(lock);
    }
    int internal_sync(std::unique_lock<std::mutex>& lock)
    {
        char* end(&this->d_tmp[0] + this->d_tmp.size());
        while (this->d_current == end && !this->d_closed) {
            this->d_condition.wait(lock);
        }
        if (this->d_current != end)
        {
            std::streamsize size(std::min(end - d_current,
                                          this->pptr() - this->pbase()));
            traits_type::copy(d_current, this->pbase(), size);
            this->d_current += size;
            std::streamsize remain((this->pptr() - this->pbase()) - size);
            traits_type::move(this->pbase(), this->pptr(), remain);
            this->setp(this->pbase(), this->epptr());
            this->pbump(remain);
            this->d_condition.notify_one();
            return 0;
        }
        return traits_type::eof();
    }
};

// ----------------------------------------------------------------------------

static void writer(std::ostream& out)
{
    for (std::string line; std::getline(std::cin, line); )
    {
        out << "writer: '" << line << "'\n";
    }
}

// ----------------------------------------------------------------------------

static void reader(std::istream& in)
{
    for (std::string line; std::getline(in, line); )
    {
        std::cout << "reader: '" << line << "'\n";
    }
}

// ----------------------------------------------------------------------------

int main()
{
    try
    {
        threadbuf sbuf;
        std::ostream out(&sbuf);
        std::istream in(&sbuf);

        std::thread write(&::writer, std::ref(out));
        std::thread read(&::reader, std::ref(in));

        write.join();
        sbuf.close();
        read.join();
    }
    catch (std::exception const& ex)
    {
        std::cerr << "ERROR: " << ex.what() << "\n";
    }
}
Novanovaculite answered 13/9, 2012 at 19:19 Comment(6)
+1 for the effort; the OP was certainly looking for a much more quick and easy fix.Bortman
Well, using the code above is quick for snybody but me :) I have seen similar request before, i.e., it may be useful for others, too. ... and it was an interesting exercise to actually implement what I only outlined before. Finally: I'm not aware of an easier fix!Gillman
You are a flipping legend Dietmar. I'm using this in a unit test and it works a treat. Thanks.Solve
One observation: in my reading class destructor, I have a while loop that does a timed_join on the reading thread, then an interrupt if the join timed out and I notice it sometimes needs to run that loop twice to get a clean exit. Otherwise I get a /include/boost/thread/pthread/condition_variable_fwd.hpp:46: boost::condition_variable::~condition_variable(): Assertion `!pthread_mutex_destroy(&internal_mutex)' failed.Solve
Great piece of code but I seem to have stumbled upon a possible deadlock in while (this->pbase() != this->pptr()) { this->internal_sync(lock); Let's say there is a read thread waiting in underflow()::d_condition.wait(lock) for the temp buffer to be non-empty. Then a call to close() happens and close() acquires the lock mutex. Close() then tries to fill the temp buffer from outbuffer in a loop (internal_sync). But it the function cannot empty outbuffer in one call, close() loops indefinitely on while (this->pbase() != this->pptr()) because the mutex is still held.Justus
And because the mutex is held, read thread cannot cooperate on emptying the temp buffer even though d_condition->notify_one() is called. Feel free to correct me.Justus
F
3

Dietmar Kühl's post is very complete. It provides a way to do this inter-thread, well done. But there is a simple way using boost::iostreams. I came across this today as I needed to learn how to handle streams(devices) and filters. My first try out it just worked. And it is a one line solution for your question.

boost::iostreams::copy(istream,ostream);

Here is a working example of where I'm at:

#include <iostream>
#include <sstream>
#include <boost/iostreams/copy.hpp>

const char* testz = R"(H4sIAJSHnl4AA61Uy3LCMAw8t19B + wF2QmAoM67ptZ / QU8Y4Alzix9hOH39fhRgnQGd66Qmx
Wq + 0kiZs86Xb2Qf4oKx5fixJ8bjhd / dsB9BshTxiPJsxD876WGuIohFRnECErd / XRmjgb + Jg
7cPs1UjCaEYTC7RQLXc2RC1CBP / SaOEl + e7fEGk1owMj0VMt1fBy + bRaVGWxXpJVWc3nK0bH
ZGJjO1B7YfbncohtYa / M6XW1KJ6KgtEByQQwSXy + KtdrrG + yHr0SzCUvvDNnWyW / a9dtWxUO
MLZj0YrhrTjCJ2yJgYiKA5YYojkqzT2jQ3BGg9udwP43YY57eAeJCi5DMvKyN9QHQ3u / doJD
lNbnrrz9HM0H23kJtXK8IvOqIuW6IAscwohnqrSdwYKMDkHGU034EG2H42pypp + ACrhqFfGc
uLEG0P8EmRJ7 + 06EgIxxEkOLOIQhM45j4vW6Lu4oG2SqARPVTuFFjy8PIBrw9c5bfbmbaeIs
dqvARBcPtYfQtXGiet32n8tP993LJH / pz2jxQpNN7f9TgcmB0RtbPT8dDqOTT8APTbkhEyYE
AAA =)";

int main(){
    std::istringstream source(testz, std::ios_base::in | std::ios_base::binary);

    namespace bio = boost::iostreams;
    bio::copy(source, std::cout);
}

What is really beautiful about the boost streams is that you can just add filters to do conversions on the stream. In my case, and why I left the long testz string, is I'll add a filter to the process, to unzip the data.

bio::filtering_istreambuf in;
in.push(bio::gzip_decompressor());
in.push(source);
bio::copy(in, std::cout);

Well, this is not working yet, I will have to figure out what extra filter I need to turn an email attachment into a qualified gzip stream. But this is a tearably elegant way to handle streams. The code stays simple at the top. I just have to push my new filter in front of the gzip_decompressor filter.

Fives answered 22/4, 2020 at 17:49 Comment(1)
Wow, it's been 8 years since I asked this, and I honestly haven't thought about it since then. But kudos for figuring out a solution!Panelboard
G
0

You may try to refer to the link.

std::cin.tie(&cout);

Gluttonize answered 17/2, 2024 at 9:55 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.