Conduit - Multiple output file within the pipeline
Asked Answered
C

2

8

I'm writing a programme where an input file is split into multiple files (Shamir's Secret Sharing Scheme).

Here's the pipeline I'm imagining:

  • source: use Conduit.Binary.sourceFile to read from the input
  • conduit: Takes a ByteString, produces [ByteString]
  • sink: Takes [ByteString] from the conduit, and write each ByteString (in [ByteString]) to their corresponding file. (say if our input [ByteString] is called bsl, then bsl !! 0 will be written to file 0, bsl !! 1 to file 1 and so on)

I found a question regarding multiple input files here, but in their case the whole pipeline is run once for each input file, whereas for my programme I'm writing to multiple output files within the pipeline.

I'm also looking through the Conduit source code here to see if I can implement a multiSinkFile myself, but I'm slightly confused by the Consumer type of sinkFile, and more so if I try to dig deeper... (I'm still a beginner)

So, the question is, how should I go about implementing a function like multiSinkFile which allows multiple files to be written as part of a sink?

Any tips is appreciated!

Clarification

Let's say we want to do Shamir's Secret sharing on the file containing binary value of "ABCDEF" (into 3 parts).

(So we have our input file srcFile and our output files outFile0,outFile1 and outFile2)

We first read "ABC" from the file, and do the processing which will give us a list of, say, ["133", "426", "765"]. so "133" will be written to outFile0, "426" to outFile1 and "765" to outFile2. And then we read "DEF" from srcFile, do processing on it, and write the corresponding outputs to each output file.

EDIT:

Thank you for your answers. I took sometime to understand what's going with ZipSinks etc, and I've written a simple test program which takes the source file's input and simply write it to 3 output files. Hopefully this will help others in the future.

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit 
import Safe (atMay)
import Text.Printf
import Filesystem.Path.CurrentOS (decodeString, encodeString)
import Control.Monad.Trans.Resource (runResourceT, ResourceT(..))

-- get the output file name given the base (file) path and the split number
getFileName :: FilePath -> Int -> FilePath
getFileName basePath splitNumber = decodeString $ encodeString basePath ++ "." ++ printf "%03d" splitNumber

-- Get the sink file, given a filepath generator (that takes an Int) and the split number
idxSinkFile :: MonadResource m
            => (Int -> FilePath)
            -> Int
            -> Consumer [ByteString] m ()
idxSinkFile mkFP splitNumber =
    concatMapC (flip atMay splitNumber) =$= sinkFile (mkFP splitNumber)

sinkMultiFiles :: MonadResource m
               => (Int -> FilePath)
               -> [Int]
               -> Sink [ByteString] m ()
sinkMultiFiles mkFP splitNumbers = getZipSink $ otraverse_ (ZipSink . idxSinkFile mkFP) splitNumbers

simpleConduit :: Int -> Conduit ByteString (ResourceT IO) [ByteString]
simpleConduit num = mapC (replicate num)

main :: IO ()
main = do
    let mkFP = getFileName "test.txt"
        splitNumbers = [0..2]
    runResourceT $ sourceFile "test.txt" $$ simpleConduit (length splitNumbers) =$ sinkMultiFiles mkFP splitNumbers
Cassady answered 19/4, 2014 at 11:30 Comment(2)
Could you be more specific about how the output? Do you want to produce one complete file, then another, etc.? Or do you want to generate their content simultaneously, that is some data to file 1, some data to file 2, then some data to file 1 etc.?Kerriekerrigan
I've added some clarification to the question. Hope that helps.Cassady
W
6

There are a number of ways to do it, depending on whether you want to dynamically grow the number of files you're writing to, or just keep a fixed number. Here's one example with a fixed list of files:

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ViewPatterns      #-}
import           ClassyPrelude.Conduit
import           Safe                  (atMay)

idxSinkFile :: MonadResource m
            => (Int -> FilePath)
            -> Int
            -> Consumer [ByteString] m ()
idxSinkFile mkFP idx =
    concatMapC (flip atMay idx) =$= sinkFile fp
  where
    fp = mkFP idx

sinkMultiFiles :: MonadResource m
               => (Int -> FilePath)
               -> [Int]
               -> Sink [ByteString] m ()
sinkMultiFiles mkFP indices = getZipSink $ otraverse_ (ZipSink . idxSinkFile mkFP) indices

someFunc :: ByteString -> [ByteString]
someFunc (decodeUtf8 -> x) = map encodeUtf8 [x, toUpper x, toLower x]

mkFP :: Int -> FilePath
mkFP 0 = "file0.txt"
mkFP 1 = "file1.txt"
mkFP 2 = "file2.txt"

src :: Monad m => Producer m ByteString
src = yieldMany $ map encodeUtf8 $ words "Hello There World!"

main :: IO ()
main = do
    let indices = [0..2]
    runResourceT $ src $$ mapC someFunc =$ sinkMultiFiles mkFP indices
    forM_ indices $ \idx -> do
        let fp = mkFP idx
        bs <- readFile fp
        print (fp, bs :: ByteString)

You can try this online with FP School of Haskell.

Wayside answered 20/4, 2014 at 9:32 Comment(4)
Thanks for your answer. Am I correct in saying that you're using idxSinkFile to create sinks that takes only their corresponding ByteString in the given [ByteString], and then using ZipSink + otraverse_ to create a single sink out of all of them?Cassady
On idxSinkFile, When we run concatMapC (flip atMay idx) over say a ["aaa", "bbb", "ccc"] instance where idx is 0, we will end up with "aaa" being yielded to our sink file fp (file0.txt).Cassady
To your first comment: yes. To your second comment: I'm not sure if it's a question.Wayside
The second one is more of an explanation to myself (and to everyone else that sees this in the future). Please correct me if I'm wrong ^^Cassady
K
8

One possibility would be to let your algorithm output something like (Int, ByteString), where Int is the index of a designated output file (of course you could use any other type as the key). This way, the conduit can decide to what file it wants to append its output.

import Data.Conduit
import qualified Data.Conduit.List as C
import qualified Data.Foldable as F

-- | Filter only pairs tagged with the appropriate key.
filterInputC :: (Monad m, Eq k) => k -> Conduit (k, a) m a
filterInputC idx = C.filter ((idx ==) . fst) =$= C.map snd

-- | Prepend a given sink with a filter.
filterInput :: (Monad m, Eq k) => k -> Sink a m r -> Sink (k, a) m r
filterInput idx = (filterInputC idx =$)

-- | Given a list of sinks, create a single sink that directs received values
-- depending on the index.
multiSink_ :: (Monad m) => [Sink a m ()] -> Sink (Int, a) m ()
multiSink_ = getZipSink . F.sequenceA_ . fmap ZipSink
             . zipWith filterInput [0..]

Update: The following example shows how multiSink_ could be used (the testing sinks just print everything to stdout with an appropriate prefix, instead of writing files).

-- | A testing sink that just prints its input, marking it with
-- a given prefix.
testSink :: String -> Sink String IO ()
testSink prefix = C.mapM_ (putStrLn . (prefix ++))

-- | An example that produces indexed output.
testSource :: (Monad m) => Source m (Int, String)
testSource = do
    yield (0, "abc")
    yield (0, "def")
    yield (1, "opq")
    yield (0, "0")
    yield (1, "1")
    yield (2, "rest")

main :: IO ()
main = testSource $$ multiSink_ (map testSink ["1: ", "2: ", "3: "])
Kerriekerrigan answered 20/4, 2014 at 16:25 Comment(3)
In your code, you're suggesting that i in my processing conduit i run multiple yield command, each with a different pair of (Int, ByteString) to achieve multiple file output?Cassady
@jtcwang Exactly. This allows your conduit to decide to which file at what time it needs to write. So you can write several chunks to one file, then several other chunks to another, or interlace them, without any restrictions. I'm adding a small example to the code.Kerriekerrigan
@PetrPudlák, thanks for your answer. Though I've accepted Michael's answer, your alternative way provides a way to implement conditional streaming to multiple files.Cassady
W
6

There are a number of ways to do it, depending on whether you want to dynamically grow the number of files you're writing to, or just keep a fixed number. Here's one example with a fixed list of files:

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ViewPatterns      #-}
import           ClassyPrelude.Conduit
import           Safe                  (atMay)

idxSinkFile :: MonadResource m
            => (Int -> FilePath)
            -> Int
            -> Consumer [ByteString] m ()
idxSinkFile mkFP idx =
    concatMapC (flip atMay idx) =$= sinkFile fp
  where
    fp = mkFP idx

sinkMultiFiles :: MonadResource m
               => (Int -> FilePath)
               -> [Int]
               -> Sink [ByteString] m ()
sinkMultiFiles mkFP indices = getZipSink $ otraverse_ (ZipSink . idxSinkFile mkFP) indices

someFunc :: ByteString -> [ByteString]
someFunc (decodeUtf8 -> x) = map encodeUtf8 [x, toUpper x, toLower x]

mkFP :: Int -> FilePath
mkFP 0 = "file0.txt"
mkFP 1 = "file1.txt"
mkFP 2 = "file2.txt"

src :: Monad m => Producer m ByteString
src = yieldMany $ map encodeUtf8 $ words "Hello There World!"

main :: IO ()
main = do
    let indices = [0..2]
    runResourceT $ src $$ mapC someFunc =$ sinkMultiFiles mkFP indices
    forM_ indices $ \idx -> do
        let fp = mkFP idx
        bs <- readFile fp
        print (fp, bs :: ByteString)

You can try this online with FP School of Haskell.

Wayside answered 20/4, 2014 at 9:32 Comment(4)
Thanks for your answer. Am I correct in saying that you're using idxSinkFile to create sinks that takes only their corresponding ByteString in the given [ByteString], and then using ZipSink + otraverse_ to create a single sink out of all of them?Cassady
On idxSinkFile, When we run concatMapC (flip atMay idx) over say a ["aaa", "bbb", "ccc"] instance where idx is 0, we will end up with "aaa" being yielded to our sink file fp (file0.txt).Cassady
To your first comment: yes. To your second comment: I'm not sure if it's a question.Wayside
The second one is more of an explanation to myself (and to everyone else that sees this in the future). Please correct me if I'm wrong ^^Cassady

© 2022 - 2024 — McMap. All rights reserved.