Ben's method works but it is a terrible method in the real-world application. With his method, there is no in-memory buffer at all and you are essentially outputting every single character.
To achieve the goal, you must create two types of classes.
A filebuf class derived from std::streambuf with virtual methods sync(), overflow(),xsputn(),seekoff(),seekpos()
overrode. It also needs to have appropriate buffering.
A stream class derived from std::basic_ostream. It should have a private member as your customized filebuf, for example buf
, and call std::basic_ios<CharT,Traits>::init like this->init(&buf)
in its constructor. You DON'T need to define any other methods because std::basic_ostream
will handle it for you. Once finishing the initiation step, this->rdbuf()
will return &buf
.
The underlying call stack is like the following
basic_ostream& operator<<:
rdbuf()->sputn
, which will call rdbuf()->xsputn
basic_ostream& put( char_type ch )
rdbuf()->sputc
, which will call rdbuf()->overflow
basic_ostream& write( const char_type* s, std::streamsize count)
rdbuf()->sputn
, which will call rdbuf()->xsputn
pos_type tellp()
rdbuf()->pubseekoff
, which will call rdbuf()->seekoff
basic_ostream& seekp( off_type off, std::ios_base::seekdir dir )
rdbuf()->pubseekpos
, which will call rdbuf()->seekpos
basic_ostream& flush()
rdbuf()->pubsync
, which will call rdbuf()->sync
Here is the an example, the full example is here https://github.com/luohancfd/mpistream
In most scenarios, you only need to change the functions to open_file, close_file, seek_pos, tell_pos, and write_data. By tweaking buf_size
, you can get a significant performance improvement.
// GPL v3.0
class MPI_filebuf : public std::streambuf {
public:
using Base = std::streambuf;
using char_type = typename Base::char_type;
using int_type = typename Base::int_type;
using pos_type = typename Base::pos_type;
using off_type = typename Base::off_type;
private:
static const std::streamsize buf_size = BUFSIZ;
char buffer_[buf_size];
MPI_File fhw;
bool opened;
/**
* @brief Always save one extra space in buffer_
* for overflow
*/
inline void reset_ptr() { setp(buffer_, buffer_ + buf_size - 1); }
protected:
/**
* @brief For output streams, this typically results in writing the contents
* of the put area into the associated sequence, i.e. flushing of the output
* buffer.
*
* @return int Returns 0 on success, -1 otherwise. The base class
* version returns 0.
*/
inline int sync() override {
int ret = 0;
if (pbase() < pptr()) {
const int_type tmp = overflow();
if (traits_type::eq_int_type(tmp, traits_type::eof())) {
ret = -1;
}
}
return ret;
}
/**
* @brief Write overflowed chars to file, derived from std::streambuf
* It's user's responsibility to maintain correct sequence of
* output as we are using shared file pointer
*
* @param ch
* @return int_type Returns unspecified value not equal to Traits::eof() on
* success, Traits::eof() on failure.
*/
inline int_type overflow(int_type ch = traits_type::eof()) override {
// https://github.com/gcc-mirror/gcc/blob/master/libstdc%2B%2B-v3/include/bits/fstream.tcc
int_type ret = traits_type::eof();
const bool testeof = traits_type::eq_int_type(ch, ret);
if (pptr() == nullptr) {
reset_ptr();
if (!testeof) {
ret = sputc(ch);
}
} else {
if (!testeof) {
*pptr() = traits_type::to_char_type(ch);
pbump(1);
}
if (write(pbase(), pptr() - pbase())) {
ret = traits_type::not_eof(ch);
}
reset_ptr();
}
return ret;
}
/**
* @brief Writes \c count characters to the output sequence from the character
* array whose first element is pointed to by \c s . Overwrite this function
* to achieve no_buffered I/O
*
* @param s
* @param n
* @return std::streamsize
*/
inline std::streamsize xsputn(const char_type *s,
std::streamsize n) override {
std::streamsize bufavail = epptr() - pptr();
std::streamsize ret = n;
// fill buffer up to "buf_size"
std::streamsize nfill = std::min(n, bufavail + 1);
std::copy(s, s + nfill, pptr());
pbump(nfill); // if nfill == bufavail+1, pptr() == epptr()
if (nfill == bufavail + 1) {
// equiv: bufavail + 1<= n
if (!write(pbase(), pptr() - pbase())) {
ret = -1;
} else {
reset_ptr();
s += nfill;
n -= nfill;
/*
repeatedly write every chunk until there is
less data than buf_size - 1
*/
while (n >= buf_size - 1) {
write(s, buf_size);
s += buf_size;
n -= buf_size;
}
std::copy(s, s + n, pptr());
pbump(n);
}
}
return ret;
}
/**
* @brief Sets the position indicator of the input and/or output
* sequence relative to some other position. It will flush
* the internal buffer to the file
* @note This function is collective, which means seekp(), tellp()
* need to be called by all processors
*
* @param off relative position to set the position indicator to.
* @param dir defines base position to apply the relative offset to.
* It can be one of the following constants: beg, cur, end
* @param which
* @return pos_type The resulting absolute position as defined by the position
* indicator.
*/
inline pos_type
seekoff(off_type off, std::ios_base::seekdir dir,
__attribute__((__unused__))
std::ios_base::openmode which = std::ios_base::out) override {
int ret = pos_type(off_type(-1));
if (is_open()) {
int whence;
if (dir == std::ios_base::beg)
whence = MPI_SEEK_SET;
else if (dir == std::ios_base::cur)
whence = MPI_SEEK_CUR;
else
whence = MPI_SEEK_END;
sync(); /*!< write data to the file */
if (off != off_type(0) || whence != SEEK_CUR) {
if (MPI_File_seek_shared(fhw, off, whence)) {
// fail to seek
return ret;
}
}
MPI_Offset tmp;
MPI_File_get_position_shared(fhw, &tmp);
ret = pos_type(tmp);
}
return ret;
}
inline pos_type seekpos(pos_type pos, __attribute__((__unused__))
std::ios_base::openmode which =
std::ios_base::out) override {
return seekoff(off_type(pos), std::ios_base::beg);
}
/**
* @brief Method doing the real writing. It moves the data in the
* internal buffer to the file
*
* @param pbeg
* @param nch
* @return true Succeed to write
* @return false Fail to write
*/
inline bool write(const char_type *pbeg, std::streamsize nch) {
return nch == 0 ||
!MPI_File_write_shared(fhw, pbeg, nch, MPI_CHAR, MPI_STATUS_IGNORE);
}
public:
MPI_filebuf() : buffer_{}, opened(false) {
setp(buffer_, buffer_ + buf_size - 1);
}
virtual ~MPI_filebuf() override {
if (opened)
close();
};
/**
* @brief return nullptr if fail
*
* @param file_name
* @return MPI_filebuf*
*/
MPI_filebuf *open(const char file_name[]);
inline bool is_open() const { return opened; }
MPI_filebuf *close() {
sync();
return MPI_File_close(&fhw) ? nullptr : this;
}
};
/* ---------------------------------------------------------------------- */
class mpistream : public std::basic_ostream<char> {
public:
// Types
using Base = std::basic_ostream<char>;
using int_type = typename Base::int_type;
using char_type = typename Base::char_type;
using pos_type = typename Base::pos_type;
using off_type = typename Base::off_type;
using traits_type = typename Base::traits_type;
// Non-standard types:
using filebuf_type = MPI_filebuf;
using ostream_type = Base;
private:
filebuf_type filebuf;
public:
mpistream() : ostream_type(), filebuf() { this->init(&filebuf); }
mpistream(const char file_name[]) : ostream_type(), filebuf() {
this->init(&filebuf);
open(file_name);
}
mpistream(const mpistream &) = delete;
mpistream(mpistream &&__rhs)
: ostream_type(std::move(__rhs)), filebuf(std::move(__rhs.filebuf)) {
ostream_type::set_rdbuf(&filebuf);
}
~mpistream() {}
inline void open(const char file_name[]) {
if (filebuf.open(file_name) == nullptr) {
this->setstate(std::ios_base::failbit);
} else {
this->clear();
}
}
inline bool is_open() const { return filebuf.is_open(); }
inline void close() {
if (!filebuf.close()) {
this->setstate(ios_base::failbit);
}
}
};