The goal is to have a conduit with the following type signature
protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
The conduit should repeatedly parse protocol buffers (using the ByteString -> a
function) received via TCP/IP (using the network-conduit
package).
The wire message format is
{length (32 bits big endian)}{protobuf 1}{length}{protobuf 2}...
(The curly braces are not party of the protocol, only used here to separate the entities).
The first idea was to use sequenceSink
to repeatedly apply a Sink
that is able to parse one ProtoBuf:
[...]
import qualified Data.Binary as B
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Util as CU
protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
protobufConduit protobufDecode =
CU.sequenceSink () $ \() ->
do lenBytes <- CB.take 4 -- read protobuf length
let len :: Word32
len = B.decode lengthBytes -- decode ProtoBuf length
intLen = fromIntegral len
protobufBytes <- CB.take intLen -- read the ProtoBuf bytes
return $ CU.Emit () [ protobufDecode protobufBytes ] -- emit decoded ProtoBuf
It doens't work (only works for the first protocol buffer) because there seems to be a number of "leftover" bytes already read from the source but not consumed via CB.take
that get discarded.
And I found no way of pushing "the rest back into the source".
Did I get the concept entirely wrong?
PS: Even if I use protocol buffers here, the problem is not related to protocol buffers. To debug the problem I always use {length}{UTF8 encoded string}{length}{UTF8 encoded string}...
and a conduit similar to the above one (utf8StringConduit :: MonadResource m => Conduit ByteString m Text
).
Update:
I just tried to replace the state (no state ()
in the sample above) by the remaining bytes and replaced the CB.take
calls by calls to a function that first consumes the already read bytes (from the state) and calls await
only as needed (when the state is not large enough). Unfortunately, that doesn't work either because as soon as the Source has no bytes left, sequenceSink
does not execute the code but the state still contains the remaining bytes :-(.
If you should be interested in the code (which isn't optimized or very good but should be enough to test):
utf8StringConduit :: forall m. MonadResource m => Conduit ByteString m Text
utf8StringConduit =
CU.sequenceSink [] $ \st ->
do (lengthBytes, st') <- takeWithState BS.empty st 4
let len :: Word32
len = B.decode $ BSL.fromChunks [lengthBytes]
intLength = fromIntegral len
(textBytes, st'') <- takeWithState BS.empty st' intLength
return $ CU.Emit st'' [ TE.decodeUtf8 $ textBytes ]
takeWithState :: Monad m
=> ByteString
-> [ByteString]
-> Int
-> Pipe l ByteString o u m (ByteString, [ByteString])
takeWithState acc state 0 = return (acc, state)
takeWithState acc state neededLen =
let stateLenSum = foldl' (+) 0 $ map BS.length state
in if stateLenSum >= neededLen
then do let (firstChunk:state') = state
(neededChunk, pushBack) = BS.splitAt neededLen firstChunk
acc' = acc `BS.append` neededChunk
neededLen' = neededLen - BS.length neededChunk
state'' = if BS.null pushBack
then state'
else pushBack:state'
takeWithState acc' state'' neededLen'
else do aM <- await
case aM of
Just a -> takeWithState acc (state ++ [a]) neededLen
Nothing -> error "to be fixed later"
protobufBytes <- CB.take intLen
. – Daedalprotobuf
is preceded by 4 bytes that represent the length (in number of bytes) of theprotobuf
. My initial theory was that these 4 bytes might actually represent the length of theprotobuf
PLUS the 4 byte length header. This would result in your code reading 4 bytes past the actual end of theprotobuf
, erroneously consuming the 4 bytes that should represent the length of the nextprotobuf
. – Daedal