Conduit - Multiple output file within the pipeline

2019-04-06 14:05发布

问题:

I'm writing a programme where an input file is split into multiple files (Shamir's Secret Sharing Scheme).

Here's the pipeline I'm imagining:

  • source: use Conduit.Binary.sourceFile to read from the input
  • conduit: Takes a ByteString, produces [ByteString]
  • sink: Takes [ByteString] from the conduit, and write each ByteString (in [ByteString]) to their corresponding file. (say if our input [ByteString] is called bsl, then bsl !! 0 will be written to file 0, bsl !! 1 to file 1 and so on)

I found a question regarding multiple input files here, but in their case the whole pipeline is run once for each input file, whereas for my programme I'm writing to multiple output files within the pipeline.

I'm also looking through the Conduit source code here to see if I can implement a multiSinkFile myself, but I'm slightly confused by the Consumer type of sinkFile, and more so if I try to dig deeper... (I'm still a beginner)

So, the question is, how should I go about implementing a function like multiSinkFile which allows multiple files to be written as part of a sink?

Any tips is appreciated!

Clarification

Let's say we want to do Shamir's Secret sharing on the file containing binary value of "ABCDEF" (into 3 parts).

(So we have our input file srcFile and our output files outFile0,outFile1 and outFile2)

We first read "ABC" from the file, and do the processing which will give us a list of, say, ["133", "426", "765"]. so "133" will be written to outFile0, "426" to outFile1 and "765" to outFile2. And then we read "DEF" from srcFile, do processing on it, and write the corresponding outputs to each output file.

EDIT:

Thank you for your answers. I took sometime to understand what's going with ZipSinks etc, and I've written a simple test program which takes the source file's input and simply write it to 3 output files. Hopefully this will help others in the future.

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit 
import Safe (atMay)
import Text.Printf
import Filesystem.Path.CurrentOS (decodeString, encodeString)
import Control.Monad.Trans.Resource (runResourceT, ResourceT(..))

-- get the output file name given the base (file) path and the split number
getFileName :: FilePath -> Int -> FilePath
getFileName basePath splitNumber = decodeString $ encodeString basePath ++ "." ++ printf "%03d" splitNumber

-- Get the sink file, given a filepath generator (that takes an Int) and the split number
idxSinkFile :: MonadResource m
            => (Int -> FilePath)
            -> Int
            -> Consumer [ByteString] m ()
idxSinkFile mkFP splitNumber =
    concatMapC (flip atMay splitNumber) =$= sinkFile (mkFP splitNumber)

sinkMultiFiles :: MonadResource m
               => (Int -> FilePath)
               -> [Int]
               -> Sink [ByteString] m ()
sinkMultiFiles mkFP splitNumbers = getZipSink $ otraverse_ (ZipSink . idxSinkFile mkFP) splitNumbers

simpleConduit :: Int -> Conduit ByteString (ResourceT IO) [ByteString]
simpleConduit num = mapC (replicate num)

main :: IO ()
main = do
    let mkFP = getFileName "test.txt"
        splitNumbers = [0..2]
    runResourceT $ sourceFile "test.txt" $$ simpleConduit (length splitNumbers) =$ sinkMultiFiles mkFP splitNumbers

回答1:

There are a number of ways to do it, depending on whether you want to dynamically grow the number of files you're writing to, or just keep a fixed number. Here's one example with a fixed list of files:

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ViewPatterns      #-}
import           ClassyPrelude.Conduit
import           Safe                  (atMay)

idxSinkFile :: MonadResource m
            => (Int -> FilePath)
            -> Int
            -> Consumer [ByteString] m ()
idxSinkFile mkFP idx =
    concatMapC (flip atMay idx) =$= sinkFile fp
  where
    fp = mkFP idx

sinkMultiFiles :: MonadResource m
               => (Int -> FilePath)
               -> [Int]
               -> Sink [ByteString] m ()
sinkMultiFiles mkFP indices = getZipSink $ otraverse_ (ZipSink . idxSinkFile mkFP) indices

someFunc :: ByteString -> [ByteString]
someFunc (decodeUtf8 -> x) = map encodeUtf8 [x, toUpper x, toLower x]

mkFP :: Int -> FilePath
mkFP 0 = "file0.txt"
mkFP 1 = "file1.txt"
mkFP 2 = "file2.txt"

src :: Monad m => Producer m ByteString
src = yieldMany $ map encodeUtf8 $ words "Hello There World!"

main :: IO ()
main = do
    let indices = [0..2]
    runResourceT $ src $$ mapC someFunc =$ sinkMultiFiles mkFP indices
    forM_ indices $ \idx -> do
        let fp = mkFP idx
        bs <- readFile fp
        print (fp, bs :: ByteString)

You can try this online with FP School of Haskell.



回答2:

One possibility would be to let your algorithm output something like (Int, ByteString), where Int is the index of a designated output file (of course you could use any other type as the key). This way, the conduit can decide to what file it wants to append its output.

import Data.Conduit
import qualified Data.Conduit.List as C
import qualified Data.Foldable as F

-- | Filter only pairs tagged with the appropriate key.
filterInputC :: (Monad m, Eq k) => k -> Conduit (k, a) m a
filterInputC idx = C.filter ((idx ==) . fst) =$= C.map snd

-- | Prepend a given sink with a filter.
filterInput :: (Monad m, Eq k) => k -> Sink a m r -> Sink (k, a) m r
filterInput idx = (filterInputC idx =$)

-- | Given a list of sinks, create a single sink that directs received values
-- depending on the index.
multiSink_ :: (Monad m) => [Sink a m ()] -> Sink (Int, a) m ()
multiSink_ = getZipSink . F.sequenceA_ . fmap ZipSink
             . zipWith filterInput [0..]

Update: The following example shows how multiSink_ could be used (the testing sinks just print everything to stdout with an appropriate prefix, instead of writing files).

-- | A testing sink that just prints its input, marking it with
-- a given prefix.
testSink :: String -> Sink String IO ()
testSink prefix = C.mapM_ (putStrLn . (prefix ++))

-- | An example that produces indexed output.
testSource :: (Monad m) => Source m (Int, String)
testSource = do
    yield (0, "abc")
    yield (0, "def")
    yield (1, "opq")
    yield (0, "0")
    yield (1, "1")
    yield (2, "rest")

main :: IO ()
main = testSource $$ multiSink_ (map testSink ["1: ", "2: ", "3: "])