-->

Rechunk a conduit into larger chunks using combina

2020-07-06 08:11发布

问题:

I am trying to construct a Conduit that receives as input ByteStrings (of around 1kb per chunk in size) and produces as output concatenated ByteStrings of 512kb chunks.

This seems like it should be simple to do, but I'm having a lot of trouble, most of the strategies I've tried using have only succeeded in dividing the chunks into smaller chunks, I haven't succeeded in concatenating larger chunks.

I started out trying isolate, then takeExactlyE and eventually conduitVector, but to no avail. Eventually I settled on this:

import qualified Data.Conduit               as C
import qualified Data.Conduit.Combinators   as C
import qualified Data.ByteString            as B
import qualified Data.ByteString.Lazy       as BL

chunksOfAtLeast :: Monad m => Int -> C.Conduit B.ByteString m BL.ByteString
chunksOfAtLeast chunkSize = loop BL.empty chunkSize
  where 
    loop buffer n = do
      mchunk <- C.await
      case mchunk of 
        Nothing -> 
          -- Yield last remaining bytes
          when (n < chunkSize) (C.yield buffer)
        Just chunk -> do
          -- Yield when the buffer has been filled and start over
          let buffer' = buffer <> BL.fromStrict chunk
              l       = B.length chunk
          if n <= l
          then C.yield buffer' >> loop BL.empty chunkSize
          else loop buffer' (n - l)

P.S. I decided not to split larger chunks for this function, but this was just a convenient simplification.

However, this seems very verbose given all the conduit functions that deal with chunking[1,2,3,4]. Please help! There must surely be a better way to do this using combinators, but I am missing some piece of intuition!

P.P.S. Is it ok to use lazy bytestring for the buffer as I've done? I'm a bit unclear about the internal representation for bytestring and whether this will help, especially since I'm using BL.length which I guess might evaluate the thunk anyway?


Conclusion

Just to elaborate on Michael's answer and comments, I ended up with this conduit:

import qualified Data.Conduit               as C
import qualified Data.Conduit.Combinators   as C
import qualified Data.ByteString            as B
import qualified Data.ByteString.Lazy       as BL

-- | "Strict" rechunk of a chunked conduit
chunksOfE' :: (MonadBase base m, PrimMonad base) 
         => Int 
         -> C.Conduit ByteString m ByteString
chunksOfE' chunkSize = C.vectorBuilder chunkSize C.mapM_E =$= C.map fromByteVector

My understanding is that vectorBuilder will pay the cost for concatenating the smaller chunks early on, producing the aggregated chunks as strict bytestrings.

From what I can tell, an alternative implementation that produces lazy bytestring chunks (i.e. "chunked chunks") might be desirable when the aggregated chunks are very large and/or feed into a naturally streaming interface like a network socket. Here's my best attempt at the "lazy bytestring" version:

import qualified Data.Sequences.Lazy        as SL
import qualified Data.Sequences             as S
import qualified Data.Conduit.List          as CL

-- | "Lazy" rechunk of a chunked conduit
chunksOfE :: (Monad m, SL.LazySequence lazy strict)
          => S.Index lazy
          -> C.Conduit strict m lazy
chunksOfE chunkSize = CL.sequence C.sinkLazy =$= C.takeE chunkSize

回答1:

How about this?

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit

chunksOfAtLeast :: Monad m => Int -> Conduit ByteString m LByteString
chunksOfAtLeast chunkSize =
    loop
  where
    loop = do
        lbs <- takeCE chunkSize =$= sinkLazy
        unless (null lbs) $ do
            yield lbs
            loop

main :: IO ()
main =
    yieldMany ["hello", "there", "world!"]
        $$ chunksOfAtLeast 3
        =$ mapM_C print

There are lots of other approaches that you could take depending on your goals. If you wanted to have a strict buffer, then using blaze-builder of vectorBuilder would make a lot of sense. But this keeps the same type signature you have already.