I write a program which counts the frequencies of NGrams in a corpus. I already have a function that consumes a stream of tokens and produces NGrams of one single order:
ngram :: Monad m => Int -> Conduit t m [t]
trigrams = ngram 3
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)
At the moment i just can connect one stream consumer to a stream source:
tokens --- trigrams --- countFreq
How do I connect multiple stream consumers to the same stream source? I would like to have something like this:
.--- unigrams --- countFreq
|--- bigrams --- countFreq
tokens ----|--- trigrams --- countFreq
'--- ... --- countFreq
A plus would be to run each consumer in parallel
EDIT: Thanks to Petr I came up with this solution
spawnMultiple orders = do
chan <- atomically newBroadcastTMChan
results <- forM orders $ \_ -> newEmptyMVar
threads <- forM (zip results orders) $
forkIO . uncurry (sink chan)
forkIO . runResourceT $ sourceFile "test.txt"
$$ javascriptTokenizer
=$ sinkTMChan chan
forM results readMVar
where
sink chan result n = do
chan' <- atomically $ dupTMChan chan
freqs <- runResourceT $ sourceTMChan chan'
$$ ngram n
=$ frequencies
putMVar result freqs