istream and ostream with shared streambuf mutually thread-safe for duplex I/O?
Asked Answered
B

3

9

I've derived a custom streambuf for buffered network socket I/O, overriding underflow, overflow, and sync so that underflow is mutually thread-safe with the set of the other two, (I have separate input and output internal buffers). That works fine, but I want to use this for full duplex I/O where one thread can input while another is outputting, so I'd like to use an istream for the receiving thread and ostream for the sending one, while sharing the network streambuf as that abstracts all the socket stuff. My question is, to what extent are the streambuf members affected by input operations on an istream disjoint from the streambuf members affected by output operations on an ostream if the input and output buffers are separate?

It would be better to be able to do that rather than have to separate the socket stuff from my streambuf abstraction so the socket can be shared between istream and ostream with separate streambufs--then I'd also need two versions of the streambuf--one with a single internal buffer (for use in either istream only or ostream only), and one with two internal buffers as I have now, for use in iostream... sucks as that's additional classes and code duplication.

Baty answered 1/4, 2012 at 9:42 Comment(2)
At first glance I had some doubts, but this looks like the best question I've seen here in a while. +1Coimbatore
Exactly what I need right now too. I hope the answers will be complete scrolls downMano
Q
2

There is no special guarantee given for std::streambuf (or std::basic_streambuf<...>) which gives more guarantees than what is generally given. That is, you can have multiple threads reading the object's state at any time but if there is one thread modifying the object's state there shall be no other thread accessing the object. Both reading and writing characters modify the stream buffer's state, i.e. from a formal point of view you can't use them without external synchronization.

Internally the two buffers are entirely separate and have nothing to do with each other. The operations on stream buffers modify them in a rather structured way and I can't imagine that any implementation would have an explicit interaction between the two sets of pointers. That is, in practical terms I don't think there is any synchronization necessary between reading and writing. However, I hadn't realized before that the two sets of buffer pointers may actually share the same cache lines which may at least cause performance problems. I don't think this should cause any correctness problems.

The only resource possibly shared between the two stream buffers is the std::locale object which is meant to be stateless, however. Also, std::streambuf doesn't make any use of this object itself: it is your stream buffer which may use some of the facets (e.g. the std::codecvt<...> facet). As the locale is changed via a call to the virtual function imbue() you would be able to intercept this change and do whatever synchronization is needed if your stream buffer uses the locale.

In summary, the standard doesn't make any guarantee that it would work to use concurrent threads to read and write using the same stream buffer. In practice, the DS9k is probably the only system where it would fail and the two threads may end up effectively synchronized due to the buffer pointers ending up in shared cache lines.

Quacksalver answered 1/4, 2012 at 11:45 Comment(5)
"… the std::locale object which is meant to be stateless…" — worth noting that he needs two mbstate_t members, separate for input and output. What is DS9k?Coimbatore
The Death Station 9000 (DS9k) is a hypothetical and deliberately as unhelpful as possible implementation of the standard: where any freedom is given it uses this to cause as much damage as possible.Oxidation
I see that istream has a sync() member, which will call the streambuf's sync(). I haven't seen anything about any other members of istream calling sync() itself so I'm assuming it's only an issue if the user calls it, and I can override it to either do nothing or cause the streambuf to try to read some more data from the socket into the buffer.Baty
But about the cache line issue: it seems there's no way to deal with it by defining one's own pointers with padding in between and then overriding eback, egptr, epptr, gbump, gptr, pbump, pbase, pptr, setg, and setp, because at least the implementation I'm looking at in Visual C++ internally sometimes accesses the pointers using other, non-standard functions (for example, sputc calls _Pnavail and _Pninc). I guess the only way is to override most functions in streambuf.Baty
The sync() method is used for output to flush the current buffer. It is meaningless for input and if std::istream calls it this seems to be wrong. With respect to the buffer pointers there is indeed nothing you can do about them possibly causing unnecessary synchronization.Oxidation
C
2

The input and output sequences are essentially independent. There's a nice diagram at cppreference.com:

Diagram of streambuf members

The only thing shared between the input and output sequences is the locale object which contains the codecvt facet used to perform text encoding translation.

In theory, changing the text encoding midstream would be thread-unsafe, but in practice the libraries don't support that operation at all anyway!

You should be good to go.

Coimbatore answered 1/4, 2012 at 10:37 Comment(16)
Changing the local midstream would lead to inconsistent behavior (as at least for the codecvt it may be in mid translation (do_in/do_out) of a chunk of input/output). Hence changing a local after anything has been read/written from/to a stream would seem like a bad idea. A consequence of this is that the stream_buf class allows derived classes to cache the facet values retrieved from a locale. Also some streams put restrictions/conditions on changing the local if they are not at the beginning (ie after read/write have been done).Robbery
@Loki: Changing the locale midstream is just like closing the file and reopening it with a different locale. There are no conceptual barriers; writing an invalid sequence results in an exception. It is supported by the standard but by few if any implementations.Coimbatore
I see no reference to opening and closing the streams when using imbue() or pubimbue() in the standard. There are conceptual barriers as I detailed above.Robbery
@LokiAstari What? The lack of references to !is_open() or the file unconditionally being at the beginning mean that it is supported, by default. More specifically, see §27.9.1.5/20. Only "state-dependent" locales are forbidden. I implemented support for changing to/from state-dependent locales in GCC, but didn't commit it. The only extension is to write a codecvt termination sequence before changing.Coimbatore
Yes it is supported if you are at the beginning or you don't have a state dependent locale. Neither is a good assumption to make (which is why I did not in the first comment). Writing a codecvt termination sequence does not seem to be the correct action to me but I could be wrong. You will need to expand on that with some reference documentation but it seems to me that would basically corrupt the existing data on the stream (unless you trashing it in which case you loose it) again neither a great option.Robbery
@LokiAstari Who are you talking about making assumptions? The library has to support what it's supposed to support. The user can do any supported thing, which includes changing non-state dependent locales. By the way, few encodings out there are state dependent. Changing midstream between UTF-8 and the trivial codecvt would be useful for text files with embedded binary data, but existing iostreams libraries don't support it. That's a problem!Coimbatore
@Potatoswatter: Yes that is why you can't actually change locals on a stream after you starts reading from it. See: 27.5.5.3. You are the only one making dangerous assumptions here.Robbery
@LokiAstari It's locale, not local. 27.5.5.3 defines ios_base members — irrelevant, and you need to pick one paragraph. You assume the user can't change it, I make no assumption because the default is that they can. Good day!Coimbatore
You are so wrong; You are making the assumptions. Writing code like this is just silly and will make the code break at some point in the future in a way that is totally undetectable and nearly imposable to find. my comment says: there are all these conditions that can cause a break (re-read my first comment) and thus making any of your assumptions is a bad idea. Your argument also breaks down as most implementations of standard streams actually prevent you from changing the local (they silently have no affect and the local is not changed (try it on any file stream after open()).Robbery
PS. I am not clear why 27.5.5.3 is irrelevant. It seems to be the crux of your misunderstanding and points to exactly why you are wrong. Maybe I am interpreting it incorrectly and you could explain why it is irrelevant, I may help me understand your argument better.Robbery
@LokiAstari Writing code like what is silly? We're talking abstractly about what people are allowed to do with iostreams. I'm quite aware of how the implementations work, as I personally made extensive changes to GCC's basic_filebuf. 27.5.5.3 is irrelevant because it doesn't describe basic_filebuf; it's a long list of member functions of basic_ios. Perhaps you got the number wrong, just as you continue to misspell locale.Coimbatore
No the number is correct. Look for imbue() in the list (since we are talking about local I though you would be able to find it). But talking about buffers without talking about streams is pointless. They are designed to work together. Normally you use it indirectly via a stream. Thus that clause is relevant. Thus calling imbue() will usually silently fail (unless you hold to some very specific conditions when using it). Writing code that is fragile (usually caused by assumptions) is silly. Your advice will cause other people to write fragile code.Robbery
@LokiAstari No, basic_streambuf is designed to work without a stream. I use it as such often, the interface is usable, the specification is completely separate from ios. GCC's streambuf unit tests don't even use ios at all. imbue silently failing is a bug in the library. I gave no advice that anyone should call imbue midstream. I merely noted that it is supported by the standard but no library I know of. And I said that very clearly, "In theory…".Coimbatore
Basic_streambuf can be used alone. It is not designed to be. Of course the unit tests don't use a stream that is why they are called Unit Tests not integration tests. It's not supported as I pointed out several times. See 27.5.5.3Robbery
@LokiAstari If all the features of basic_streambuf can be exercised without resorting to basic_ios, then 27.5.5.3 is irrelevant to what it supports.Coimbatore
Furthermore, all it says of imbue is: "locale imbue(const locale& loc); Effects: Calls ios_base::imbue(loc) (27.5.3.3) and if rdbuf()!=0 then rdbuf()->pubimbue(loc) (27.6.3.2.1). Returns: The prior value of ios_base::imbue()." How does this support your point?Coimbatore
Q
2

There is no special guarantee given for std::streambuf (or std::basic_streambuf<...>) which gives more guarantees than what is generally given. That is, you can have multiple threads reading the object's state at any time but if there is one thread modifying the object's state there shall be no other thread accessing the object. Both reading and writing characters modify the stream buffer's state, i.e. from a formal point of view you can't use them without external synchronization.

Internally the two buffers are entirely separate and have nothing to do with each other. The operations on stream buffers modify them in a rather structured way and I can't imagine that any implementation would have an explicit interaction between the two sets of pointers. That is, in practical terms I don't think there is any synchronization necessary between reading and writing. However, I hadn't realized before that the two sets of buffer pointers may actually share the same cache lines which may at least cause performance problems. I don't think this should cause any correctness problems.

The only resource possibly shared between the two stream buffers is the std::locale object which is meant to be stateless, however. Also, std::streambuf doesn't make any use of this object itself: it is your stream buffer which may use some of the facets (e.g. the std::codecvt<...> facet). As the locale is changed via a call to the virtual function imbue() you would be able to intercept this change and do whatever synchronization is needed if your stream buffer uses the locale.

In summary, the standard doesn't make any guarantee that it would work to use concurrent threads to read and write using the same stream buffer. In practice, the DS9k is probably the only system where it would fail and the two threads may end up effectively synchronized due to the buffer pointers ending up in shared cache lines.

Quacksalver answered 1/4, 2012 at 11:45 Comment(5)
"… the std::locale object which is meant to be stateless…" — worth noting that he needs two mbstate_t members, separate for input and output. What is DS9k?Coimbatore
The Death Station 9000 (DS9k) is a hypothetical and deliberately as unhelpful as possible implementation of the standard: where any freedom is given it uses this to cause as much damage as possible.Oxidation
I see that istream has a sync() member, which will call the streambuf's sync(). I haven't seen anything about any other members of istream calling sync() itself so I'm assuming it's only an issue if the user calls it, and I can override it to either do nothing or cause the streambuf to try to read some more data from the socket into the buffer.Baty
But about the cache line issue: it seems there's no way to deal with it by defining one's own pointers with padding in between and then overriding eback, egptr, epptr, gbump, gptr, pbump, pbase, pptr, setg, and setp, because at least the implementation I'm looking at in Visual C++ internally sometimes accesses the pointers using other, non-standard functions (for example, sputc calls _Pnavail and _Pninc). I guess the only way is to override most functions in streambuf.Baty
The sync() method is used for output to flush the current buffer. It is meaningless for input and if std::istream calls it this seems to be wrong. With respect to the buffer pointers there is indeed nothing you can do about them possibly causing unnecessary synchronization.Oxidation
M
1

For full-duplex you need two buffers. If you use streambuf interfaces for both, so that you can hook 'm up to the usual ostream and istream interface, then the complete picture looks something like this:

A cross linked two streambuf interface

The two buffers are obviously completely independent and symmetrical, so we can ignore one side and just concentrate on a single buffer.

Moreover, it is safe to assume that there are only two threads: the reading thread, and the writing thread. If more threads would be involved then two threads would be reading at the same time, or writing at the same time; which would lead to undesirable race conditions and therefore makes no sense. We can assume that the user will have some mechanism in place that assures only one thread at a time writes to a streambuf, and likewise only one thread at a time reads from it.

In the most general case the actual buffer exists of multiple contiguous memory blocks. Each put- and get area is entirely inside one such block. As long as they are in a different memory blocks then they are, again, unrelated.

Each get/put area exists of three pointers: one pointer that points to the start of the area (eback/pbase), one pointer that points one byte past the end of the area (egptr/epptr), and a pointer that points to the current position in the area (gptr/pptr). Each of those pointers can be accessed directly by a class derived from std::streambuf through protected accessors of the same name (eback(), pbase(), egptr(), epptr(), gptr() and pptr()). Note that here we mean the eback(), egptr() and gptr() of one streambuf and the pbase(), epptr() and pptr() of the other streambuf (see the image above).

std::streambuf has public functions that access or change these six pointers. They are:

table, th, td {
  border: 1px solid black;
  border-collapse: collapse;
}
th, td {
  padding: 5px;
}
<table style="width:100%">
<caption>Public member functions of <code>std::streambuf</code></caption>
<tr>
<th>Method</th><th>Changes and/or accesses</th>
</tr>
<tr>
<td><code>pubsetbuf()</code></td><td>Calls <code>setbuf()</code> of the most derived class</td>
<tr></tr>
<td><code>pubseekoff()</code></td><td>Calls <code>seekoff()</code> of the most derived class</td>
<tr></tr>
<td><code>pubseekpos()</code></td><td>Calls <code>seekpos()</code> of the most derived class</td>
<tr></tr>
<td><code>pubsync()</code></td><td>Calls <code>sync()</code> of the most derived class</td>
</tr><tr>
<td><code>in_avail()</code></td><td>Get area</td>
</tr><tr>
<td><code>snextc()</code></td><td>Calls <code>sbumpc()</code>, <code>uflow()</code> and/or <code>sgetc()</code></td>
</tr><tr>
<td><code>sbumpc()</code></td><td><code>gptr</code>, possibly calls <code>uflow()</code></td>
</tr><tr>
<td><code>sgetc()</code></td><td><code>gptr</code>, possibly calls <code>underflow()</code></td>
</tr><tr>
<td><code>sgetn()</code></td><td>Calls <code>xgetn()</code> of the most derived class.</td>
</tr><tr>
<td><code>sputc()</code></td><td><code>pptr</code>, possibly calls <code>overflow()</code></td>
</tr><tr>
<td><code>sputn()</code></td><td>Calls <code>xsputn()</code> of the most derived class</td>
</tr><tr>
<td><code>sputbackc()</code></td><td><code>gptr</code>, possibly calls <code>pbackfail()</code></td>
</tr><tr>
<td><code>sungetc()</code></td><td><code>gptr</code>, possibly calls <code>pbackfail()</code></td>
</tr>
</table>

The protected member functions are

table, th, td {
  border: 1px solid black;
  border-collapse: collapse;
}
th, td {
  padding: 5px;
}
<table style="width:100%">
<caption>Protected member functions of <code>std::streambuf</code></caption>
<tr>
<th>Method</th><th>Changes and/or accesses</th>
</tr>
<tr>
<td><code>setbuf()</code></td><td>User defined (could be used for single array buffers)</td>
<tr></tr>
<td><code>seekoff()</code></td><td>User defined (repositions get area)</td>
<tr></tr>
<td><code>seekpos()</code></td><td>User defined (repositions get area)</td>
<tr></tr>
<td><code>sync()</code></td><td>User defined (could do anything, depending on which buffer this is, could change either get area or put area)</td>
</tr><tr>
<td><code>showmanyc()</code></td><td>User defined (get area; if put area uses the same allocated memory block, can also accesses pptr)</td>
</tr><tr>
<td><code>underflow()</code></td><td>User defined (get area; but also strongly coupled to put ares)</td>
</tr><tr>
<td><code>uflow()</code></td><td>Calls underflow() and advances gptr</td>
</tr><tr>
<td><code>xsgetn()</code></td><td>get area (as if calling <code>sbumpc()</code> repeatedly), might call <code>uflow()</code></td>
</tr><tr>
<td><code>gbump()</code></td><td>gptr</td>
</tr><tr>
<td><code>setg()</code></td><td>get area</td>
</tr><tr>
<td><code>xsputn()</code></td><td>put area (as if calling <code>sputc()</code> repeatedly), might call <code>overflow()</code> or do something similar)</td>
</tr><tr>
<td><code>overflow()</code></td><td>put area</td>
</tr><tr>
<td><code>pbump()</code></td><td>pptr</td>
</tr><tr>
<td><code>setp()</code></td><td>put area</td>
</tr><tr>
<td><code>pbackfail()</code></td><td>User defined (might be pure horror; aka, get and put area)</td>
</tr>
</table>

We should separate reading and writing actions into actions per (contiguous) memory block. Of course it is possible that a single call to -say- sputn() writes to multiple blocks, but we can lock and unlock per block-action.

There are several significant states of a buffer, depicted in the picture below. Green arrows represent transitions between states done by the thread(s) that read data from the get area, while blue arrows represent transitions between states done by thread(s) that write data to the put area. In other words, two green actions can not occur at the same time; not can two blue actions. But a green and a blue action might happen at the same time.

Read and write streambuf transitions

I still have to write an implementation for this, but my approach will be to use a single mutex per buffer and only lock it at the beginning of every action in order to get the necessary information to perform a read and/or write action. Then at the end of that action, lock the mutex again to see if something was changed by the other thread and/or to finish the read/write with an administrative action.

Every time the write thread bumps pptr, egptr is updated atomically, unless at the beginning of the write action eback != pbase; in which case egptr doesn't need updating of course. This requires to lock a mutex before the bump and unlock after also egptr is updated. The same mutex is locked when moving get- or put areas therefore. We might not lock the mutex when bumping gptr itself, but if we do that then at the beginning of the corresponding read action there was data in the buffer, and a concurrent write action wouldn't change that, so there is no danger that the write thread(s) would try to move the get area at the same time.

I'll edit this answer when I figure out more details.

Mano answered 21/12, 2018 at 22:26 Comment(4)
Thanks for your thorough description here. A cross-thread stream buffer seems to be missing from the standard library, which is surprising. Have you learned anything more about this?Carn
Yes, I finished my library now - it is extremely difficult/complex. I mean EXTREMELY difficult stuff. I don't think support can be added to the standard simply because it is too difficult. The main problem (if there is A main problem) is that the streambuf API can cause changes to the (non atomic) gptr and pptr pointers (through istream/ostream actions) that are out of the control of the coder that derives from std::streambuf; this makes std::streambuf inherently non-thread-safe. The hoops that I jumped through to get this to work are not really worth it.Mano
See also github.com/CarloWood/evio/blob/master/README.StreamBuf#L53Mano
Sounds like a separate thread-safe class is needed. Thanks for all your work.Carn

© 2022 - 2024 — McMap. All rights reserved.