How to merge one-to-one and one-to-many input:outp

2019-08-21 07:45发布

I've been struggling with this problem for a while now, though to be fair, I've learned a lot about Conduit, since previously I was mostly using canned examples with a few exceptions.

The basic problem is framed like this for conduits A, B, and C; A .| B (A feeds into B) and A .| C, and finally I need to have a function that takes B and C and produces an intermediate Conduit, call it Merge B C, so that I can do (Merge B C) .| D. My experience in non Haskell languages with FRP/streaming libraries suggest that there are several different ways to go about doing a "Merge" (e.g. the zip family of operations "sample on" - only produce new elements for D when one or more selected input conduits have a new value, etc). I think my trouble is understanding how to do that in Conduit, if it is supported.

Even more specifically for my particular problem today, B has a 1:1 relationship with A, whereas C has a many:1 relationship with A, and ultimately in D I want repeated elements of B combined with the corresponding elements of C: if a~b and a~c for a in A, b in B, and c in C, then (b,c) are fed into D. So I was able to use ZipSink and the fact that it is actually a reasonable place to do a sink (performance aside, which I haven't looked at). Of course, as completely expected, getZipSink does not know anything about one to many relationships and how to deal with them; it has the widely defined behavior of zip to just cycle through input streams until all input streams have been cycled through once.

I suppose one way to do this might be to somehow change my one-to-many stream into a one-to-one stream by doing a fold into something like a list. But then I'd have to unpack it later outside of the conduit context. At this point, I just want to ask to see what is the recommended way(s).

My actual code looks like (A is sourceDirectoryDeep, B is processFileName, C is processCSV, and D is (sort of, I suppose) getZipSink):

retrieveSmaXtec :: Path Abs Dir -> IO (Vector SxRecord)
retrieveSmaXtec sxDir = do
  rows <- sourceDirectoryDeep False (fromAbsDir sxDir)
    .| getZipSink (combine <$> ZipSink processFileName <*> ZipSink processCSV )
    & runConduitRes
  print rows
  rows & fmap fromRow & catMaybes & return
  where
    combine :: (Vector (MapRow Text)) -> (Vector (MapRow Text)) -> (Vector (MapRow Text))
    combine v1 v2 = (uncurry DM.union) <$> (zip v1 v2)
    processCSV :: (MonadResource m, MonadThrow m, PrimMonad m)=>
      ConduitT FilePath Void m (Vector ((MapRow Text)))
    processCSV = mapMC (liftIO . DTIO.readFile)
      .| intoCSV defCSVSettings
      .| sinkVector
    processFileName :: (MonadResource m, MonadThrow m, PrimMonad m) =>
      ConduitT FilePath Void m (Vector ((MapRow Text)))
    processFileName = mapC go
      .| sinkVector
      where
        go :: FilePath -> MapRow Text
        go fp = takeFileName fp
          & takeWhile (/= '.')
          & splitOn "_"
          & fmap Txt.pack
          & zip colNames
          & DM.fromList
        colNames = [markKey, idKey]

The imports (some of which may be extraneous) are:

import           Conduit
import qualified Data.Conduit.Combinators       as DCC
import           Data.CSV.Conduit
import           Data.Function                  ((&))
import           Data.List.Split                (splitOn)
import           Data.Map                       as DM
import           Data.Text                      (Text)
import qualified Data.Text                      as Txt
import qualified Data.Text.IO                   as DTIO
import           Data.Vector                    (Vector)
import           Path
import           System.FilePath.Posix

0条回答
登录 后发表回答