Rechunk a conduit into larger chunks using combinators
Asked Answered
R

1

9

I am trying to construct a Conduit that receives as input ByteStrings (of around 1kb per chunk in size) and produces as output concatenated ByteStrings of 512kb chunks.

This seems like it should be simple to do, but I'm having a lot of trouble, most of the strategies I've tried using have only succeeded in dividing the chunks into smaller chunks, I haven't succeeded in concatenating larger chunks.

I started out trying isolate, then takeExactlyE and eventually conduitVector, but to no avail. Eventually I settled on this:

import qualified Data.Conduit               as C
import qualified Data.Conduit.Combinators   as C
import qualified Data.ByteString            as B
import qualified Data.ByteString.Lazy       as BL

chunksOfAtLeast :: Monad m => Int -> C.Conduit B.ByteString m BL.ByteString
chunksOfAtLeast chunkSize = loop BL.empty chunkSize
  where 
    loop buffer n = do
      mchunk <- C.await
      case mchunk of 
        Nothing -> 
          -- Yield last remaining bytes
          when (n < chunkSize) (C.yield buffer)
        Just chunk -> do
          -- Yield when the buffer has been filled and start over
          let buffer' = buffer <> BL.fromStrict chunk
              l       = B.length chunk
          if n <= l
          then C.yield buffer' >> loop BL.empty chunkSize
          else loop buffer' (n - l)

P.S. I decided not to split larger chunks for this function, but this was just a convenient simplification.

However, this seems very verbose given all the conduit functions that deal with chunking[1,2,3,4]. Please help! There must surely be a better way to do this using combinators, but I am missing some piece of intuition!

P.P.S. Is it ok to use lazy bytestring for the buffer as I've done? I'm a bit unclear about the internal representation for bytestring and whether this will help, especially since I'm using BL.length which I guess might evaluate the thunk anyway?


Conclusion

Just to elaborate on Michael's answer and comments, I ended up with this conduit:

import qualified Data.Conduit               as C
import qualified Data.Conduit.Combinators   as C
import qualified Data.ByteString            as B
import qualified Data.ByteString.Lazy       as BL

-- | "Strict" rechunk of a chunked conduit
chunksOfE' :: (MonadBase base m, PrimMonad base) 
         => Int 
         -> C.Conduit ByteString m ByteString
chunksOfE' chunkSize = C.vectorBuilder chunkSize C.mapM_E =$= C.map fromByteVector

My understanding is that vectorBuilder will pay the cost for concatenating the smaller chunks early on, producing the aggregated chunks as strict bytestrings.

From what I can tell, an alternative implementation that produces lazy bytestring chunks (i.e. "chunked chunks") might be desirable when the aggregated chunks are very large and/or feed into a naturally streaming interface like a network socket. Here's my best attempt at the "lazy bytestring" version:

import qualified Data.Sequences.Lazy        as SL
import qualified Data.Sequences             as S
import qualified Data.Conduit.List          as CL

-- | "Lazy" rechunk of a chunked conduit
chunksOfE :: (Monad m, SL.LazySequence lazy strict)
          => S.Index lazy
          -> C.Conduit strict m lazy
chunksOfE chunkSize = CL.sequence C.sinkLazy =$= C.takeE chunkSize
Runway answered 21/8, 2014 at 13:49 Comment(1)
Also, incredibly, I haven't managed to find an example that does only this...Runway
M
11

How about this?

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit

chunksOfAtLeast :: Monad m => Int -> Conduit ByteString m LByteString
chunksOfAtLeast chunkSize =
    loop
  where
    loop = do
        lbs <- takeCE chunkSize =$= sinkLazy
        unless (null lbs) $ do
            yield lbs
            loop

main :: IO ()
main =
    yieldMany ["hello", "there", "world!"]
        $$ chunksOfAtLeast 3
        =$ mapM_C print

There are lots of other approaches that you could take depending on your goals. If you wanted to have a strict buffer, then using blaze-builder of vectorBuilder would make a lot of sense. But this keeps the same type signature you have already.

Malta answered 21/8, 2014 at 15:6 Comment(9)
Thanks!! I think the multiple approaches is one of the things that has really confused me. I expected something like C.takeExactlyE chunkSize C.concat to work, I'm still not sure I understand why it doesn't...?Runway
Oh, or rather, I felt like I should be able to write something very similar to forever $ C.takeExactlyE chunkSize C.concat except that forever doesn't terminate...Runway
I'm sorry, I'm having some trouble compiling your example Couldn't match type ‘[a0]’ with ‘BL.ByteString’ (not using ClassyPrelude). Also, does the chunks of work with chunksOfAtLeast 8? It's trying get larger chunks rather than smaller chunks for a file upload over https...Runway
I think you answered your forever point yourself: forever never terminates. Regarding takeExactlyE vs takeE: the only difference is when the downstream doesn't fully consume the input, but sinkLazy does always consume all its input, so the two can be used interchangeably here. Instead of concat, fold is the combinator you're looking for, which would simply change the output type from LByteString to ByteString.Malta
Do you think it would make sense for me to abstract in terms of something like a takeForever function with behaviour similar to awaitForever?Runway
What I mean is that I'd like to try and get to a combinator style solution if it's possible, eliding the loop... EDIT: Oh or are you saying I should try and use fold.... Thanks let me try this first.Runway
Regarding fold: there are three different ways I can think of to aggregate the ByteStrings: into a lazy ByteString, folding them together, or converting to a Builder and concatenating the Builder. All of them are valid, it depends on what you want to do with the output. It's certainly possible to put together some kind of combinator like takeForever. There's also the possibility you mentioned of using conduitVector.Malta
That conduitVector solution in you paste seems perfect to me, I'll give that a try. I have a little bit of trouble understanding the performance implications of the various solutions but this is probably due to inexperience on my part with things like Buillder. It's always awesome when a library author answers your SO question, thanks!Runway
One more version on vectorBuilder which is likely a bit faster: lpaste.net/8310511707713699840Malta

© 2022 - 2024 — McMap. All rights reserved.