Running parallel URL downloads in Haskell

Below is Haskell code that (HTTP) downloads files that are missing from the given directory:

module Main where

import Control.Monad ( filterM
                     , liftM
import Data.Maybe ( fromJust )
import Network.HTTP ( RequestMethod(GET)
                    , rspBody
                    , simpleHTTP
import Network.HTTP.Base ( Request(..) )
import Network.URI ( parseURI )
import System.Directory ( doesFileExist )
import System.Environment ( getArgs )
import System.IO ( hClose
                 , hPutStr
                 , hPutStrLn
                 , IOMode(WriteMode)
                 , openFile
                 , stderr
import Text.Printf ( printf )

indices :: [String]
indices =
  map format1 [0..9] ++ map format2 [0..14] ++ ["40001-41284" :: String]
    format1 index =
      printf "%d-%d" ((index * 1000 + 1) :: Int)
                     (((index + 1) * 1000) :: Int)
    format2 index =
      printf "%d-%d" ((10000 + 2 * index * 1000 + 1) :: Int)
                     ((10000 + (2 * index + 2) * 1000) :: Int)

main :: IO ()
main = do
  [dir] <- getArgs
  updateDownloads dir

updateDownloads :: FilePath -> IO ()
updateDownloads path = do
    fileNames = map (\index ->
      (index, path ++ "/tv_and_movie_freqlist" ++ index ++ ".html")) indices
  missing <-
    filterM (\(_, fileName) -> liftM not $ doesFileExist fileName) fileNames
  mapM_ (\(index, fileName) -> do
      url =
        "" ++
      request =
          { rqURI = fromJust $ parseURI url
          , rqMethod = GET
          , rqHeaders = []
          , rqBody = ""
    hPutStrLn stderr $ "Downloading " ++ show url
    resp <- simpleHTTP request
    case resp of
      Left _ -> hPutStrLn stderr $ "Error connecting to " ++ show url
      Right response -> do
          html = rspBody response
        file <- openFile fileName WriteMode
        hPutStr file html
        hClose file
    return ()) missing

I would like to run the downloads in parallel. I know about par, but am not sure if it can be used in the IO monad, and if so, how?

UPDATE: Here is my code reimplemented using Control.Concurrent.Async and mapConcurrently:

module Main where

import Control.Concurrent.Async ( mapConcurrently )
import Control.Monad ( filterM
                     , liftM
import Data.Maybe ( fromJust )
import Network.HTTP ( RequestMethod(GET)
                    , rspBody
                    , simpleHTTP
import Network.HTTP.Base ( Request(..) )
import Network.URI ( parseURI )
import System.Directory ( doesFileExist )
import System.Environment ( getArgs )
import System.IO ( hClose
                 , hPutStr
                 , hPutStrLn
                 , IOMode(WriteMode)
                 , openFile
                 , stderr
import Text.Printf ( printf )

indices :: [String]
indices =
  map format1 [0..9] ++ map format2 [0..14] ++ ["40001-41284" :: String]
    format1 index =
      printf "%d-%d" ((index * 1000 + 1) :: Int)
                     (((index + 1) * 1000) :: Int)
    format2 index =
      printf "%d-%d" ((10000 + 2 * index * 1000 + 1) :: Int)
                     ((10000 + (2 * index + 2) * 1000) :: Int)

main :: IO ()
main = do
  [dir] <- getArgs
  updateDownloads dir

updateDownloads :: FilePath -> IO ()
updateDownloads path = do
    fileNames = map (\index ->
      (index, path ++ "/tv_and_movie_freqlist" ++ index ++ ".html")) indices
  missing <-
    filterM (\(_, fileName) -> liftM not $ doesFileExist fileName) fileNames
  pages <-
    mapConcurrently (\(index, fileName) -> getUrl index fileName) missing
  mapM_ (\(fileName, html) -> do
    handle <- openFile fileName WriteMode
    hPutStr handle html
    hClose handle) pages
    getUrl :: String -> FilePath -> IO (FilePath, String)
    getUrl index fileName = do
        url =
          "" ++
        request =
          { rqURI = fromJust $ parseURI url
          , rqMethod = GET
          , rqHeaders = []
          , rqBody = ""
      resp <- simpleHTTP request
      case resp of
        Left _ -> do
          hPutStrLn stderr $ "Error connecting to " ++ show url
          return ("", "")
        Right response ->
          return (fileName, rspBody response)

Have a look at mapConcurrently from Simon Marlow's "async" library.

It maps an IO action in parallel and asynchronously to the elements of a Traversable container and waits for all actions.


{-# LANGUAGE PackageImports #-}

import System.Environment (getArgs)

import "async" Control.Concurrent.Async (mapConcurrently)

import "HTTP" Network.HTTP
import "HTTP" Network.Stream (Result)
import "HTTP" Network.HTTP.Base (Response(..))
import System.IO
import "url" Network.URL (encString)

import Control.Monad

getURL :: String -> IO (String, Result (Response String))
getURL url = do
        res <- (simpleHTTP . getRequest) url
        return (url, res)

main = do
     args <- getArgs
     case args of
          [] -> putStrLn "usage: program url1 url2 ... urlN"
          args -> do
                results <- mapConcurrently getURL args
                forM_ results $ \(url, res) -> do
                        case res of
                                Left connError -> putStrLn $ url ++ "; " ++ show connError
                                Right response -> do
                                        putStrLn $ url ++ "; OK"
                                        let content = rspBody response

                                            -- make name from url
                                            fname = encString True (`notElem` ":/") url ++ ".html"
                                        writeFile fname content    
Another version that uses async's mapConcurrently and the http-conduit keep-alive manager

{-# LANGUAGE PackageImports, FlexibleContexts #-}

import System.Environment (getArgs)

import "http-conduit" Network.HTTP.Conduit
import qualified "conduit" Data.Conduit as C
import "http-types" Network.HTTP.Types.Status (ok200)

import "async" Control.Concurrent.Async (mapConcurrently)
import qualified "bytestring" Data.ByteString.Lazy as LBS
import qualified "bytestring" Data.ByteString as BS
import "transformers" Control.Monad.Trans.Class (lift)
import "transformers" Control.Monad.IO.Class (liftIO)
import "url" Network.URL (encString)
import "failure" Control.Failure (Failure(..))

import Control.Monad
import System.IO

taggedRequest :: Failure HttpException m => String -> m (String, Request m')
taggedRequest url = do
        req <- parseUrl url
        return (url, req)

taggedResult :: (C.MonadBaseControl IO m, C.MonadResource m) => Manager -> (String, Request m) -> m (String, Response LBS.ByteString)
taggedResult manager (url, req) = do
        res <- httpLbs req manager
        return (url, res)

main = do
     args <- getArgs
     case args of
          [] -> putStrLn "usage: program url1 url2 ... urlN"
          args -> do
                requests <- mapM taggedRequest args
                withManager $ \manager -> liftIO $ do

                        results <- mapConcurrently (C.runResourceT . taggedResult manager) requests

                        forM_ results $ \(url, Response status _ _ bsBody) -> do
                             putStrLn $ url ++ " ; " ++ show status   
                             let fileName = encString True (`notElem` ":/") url ++ ".html"
                             when (status == ok200) $ LBS.writeFile fileName bsBody
Since the operations involve IO, you typically would /not/ use par for this, as it doesn't do anything to IO actions.

You will need an explicit concurrency model, to hide the latency of downloading.

I'd recommend MVars or TVars, combined with forkIO.

A work queue abstraction is often useful for this style of problem: push all URLs into a queue, and have a fixed set of worker threads (e.g. N * k) for N cores, take jobs until done. Completed work would then be appended to a communication channel handed back to the main thread.

Here's an example from a parallel URL checker, using channels.

This looks like it's exactly what async is designed for, in fact the example is for parallel downloads. There is a presentation on this too - - well worth checking out.

