I've been struggling with this problem for a while now, though to be fair, I've learned a lot about Conduit, since previously I was mostly using canned examples with a few exceptions.
The basic problem is framed like this for conduits A
, B
, and C
; A .| B
(A
feeds into B
) and A .| C
, and finally I need to have a function that takes B and C and produces an intermediate Conduit, call it Merge B C
, so that I can do (Merge B C) .| D
. My experience in non Haskell languages with FRP/streaming libraries suggest that there are several different ways to go about doing a "Merge" (e.g. the zip family of operations "sample on" - only produce new elements for D
when one or more selected input conduits have a new value, etc). I think my trouble is understanding how to do that in Conduit, if it is supported.
Even more specifically for my particular problem today, B
has a 1:1 relationship with A
, whereas C
has a many:1 relationship with A
, and ultimately in D
I want repeated elements of B
combined with the corresponding elements of C
: if a~b
and a~c
for a
in A
, b
in B
, and c
in C
, then (b,c)
are fed into D
. So I was able to use ZipSink
and the fact that it is actually a reasonable place to do a sink (performance aside, which I haven't looked at). Of course, as completely expected, getZipSink
does not know anything about one to many relationships and how to deal with them; it has the widely defined behavior of zip to just cycle through input streams until all input streams have been cycled through once.
I suppose one way to do this might be to somehow change my one-to-many stream into a one-to-one stream by doing a fold into something like a list. But then I'd have to unpack it later outside of the conduit context. At this point, I just want to ask to see what is the recommended way(s).
My actual code looks like (A
is sourceDirectoryDeep
, B
is processFileName
, C
is processCSV
, and D
is (sort of, I suppose) getZipSink
):
retrieveSmaXtec :: Path Abs Dir -> IO (Vector SxRecord)
retrieveSmaXtec sxDir = do
rows <- sourceDirectoryDeep False (fromAbsDir sxDir)
.| getZipSink (combine <$> ZipSink processFileName <*> ZipSink processCSV )
& runConduitRes
print rows
rows & fmap fromRow & catMaybes & return
where
combine :: (Vector (MapRow Text)) -> (Vector (MapRow Text)) -> (Vector (MapRow Text))
combine v1 v2 = (uncurry DM.union) <$> (zip v1 v2)
processCSV :: (MonadResource m, MonadThrow m, PrimMonad m)=>
ConduitT FilePath Void m (Vector ((MapRow Text)))
processCSV = mapMC (liftIO . DTIO.readFile)
.| intoCSV defCSVSettings
.| sinkVector
processFileName :: (MonadResource m, MonadThrow m, PrimMonad m) =>
ConduitT FilePath Void m (Vector ((MapRow Text)))
processFileName = mapC go
.| sinkVector
where
go :: FilePath -> MapRow Text
go fp = takeFileName fp
& takeWhile (/= '.')
& splitOn "_"
& fmap Txt.pack
& zip colNames
& DM.fromList
colNames = [markKey, idKey]
The imports (some of which may be extraneous) are:
import Conduit
import qualified Data.Conduit.Combinators as DCC
import Data.CSV.Conduit
import Data.Function ((&))
import Data.List.Split (splitOn)
import Data.Map as DM
import Data.Text (Text)
import qualified Data.Text as Txt
import qualified Data.Text.IO as DTIO
import Data.Vector (Vector)
import Path
import System.FilePath.Posix