In threads package in module Control.Concurrent.Thread.Group
there is a function forkIO
:
forkIO :: ThreadGroup -> IO α -> IO (ThreadId, IO (Result α))
I'd like to lift it using MonadBaseControl
from monad-control. Here is my attempt:
fork :: (MonadBase IO m) => TG.ThreadGroup -> m α -> m (ThreadId, m (Result α))
fork tg action = control (\runInBase -> TG.forkIO tg (runInBase action))
and here is the error messsage:
Couldn't match type `(ThreadId, IO (Result (StM m α)))'
with `StM m (ThreadId, m (Result α))'
Expected type: IO (StM m (ThreadId, m (Result α)))
Actual type: IO (ThreadId, IO (Result (StM m α)))
In the return type of a call of `TG.forkIO'
In the expression: TG.forkIO tg (runInBase action)
In the first argument of `control', namely
`(\ runInBase -> TG.forkIO tg (runInBase action))'
What to change to make the types match?
The main problem is the IO a
argument to forkIO
. To fork an m a
action in IO
we'd need a way to run an m a
to an IO a
. To do this, we could try to make the class of monads that have a runBase :: MonadBase b m => m a -> b a
method, but very few interesting transformers can provide that. If we consider for example the StateT
transformer, it could figure out how to run something in the base monad with runStateT
if it's first given an opportunity to observe its own state.
runFork :: Monad m => StateT s m a -> StateT s m (m b)
runFork x = do
s <- get
return $ do
(a, s') <- runStateT x s
return a
This suggests the type runForkBase :: MonadBase b m => m a -> m (b a)
, which we will settle on for the following type class.
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FunctionalDependencies #-}
import Control.Monad.Base
class (MonadBase b m) => MonadRunForkBase b m | m -> b where
runForkBase :: m a -> m (b a)
I added the word Fork
to the name to emphasize that the future state changes will not in general be shared between the two futures. For this reason, the few interesting transformers like WriterT
that could have provided a runBase
only provide an uninteresting runBase
; they produce side effects that will never be observable.
We can write something like fork
for anything with the limited form of lowering provided by a MonadRunForkBase IO m
instance. I'm going to lift
the normal forkIO
from base rather than the one from threads, which you can do the same way.
{-# LANGUAGE FlexibleContexts #-}
import Control.Concurrent
forkInIO :: (MonadRunForkBase IO m) => m () -> m ThreadId
forkInIO action = runForkBase action >>= liftBase . forkIO
Instances
This raises the question, "What transformers can we provide MonadRunForkBase
instances for"? Straight off the bat, we can trivially provide them for any of the base monads that have MonadBase
instances
import Control.Monad.Trans.Identity
import GHC.Conc.Sync (STM)
instance MonadRunForkBase [] [] where runForkBase = return
instance MonadRunForkBase IO IO where runForkBase = return
instance MonadRunForkBase STM STM where runForkBase = return
instance MonadRunForkBase Maybe Maybe where runForkBase = return
instance MonadRunForkBase Identity Identity where runForkBase = return
For transformers, it's usually easier to build up functionality like this step-by-step. Here's the class of transformers that can run a fork in the immediately underlying monad.
import Control.Monad.Trans.Class
class (MonadTrans t) => MonadTransRunFork t where
runFork :: Monad m => t m a -> t m (m a)
We can provide a default implementation for running all the way down in the base
runForkBaseDefault :: (Monad (t m), MonadTransRunFork t, MonadRunForkBase b m) =>
t m a -> t m (b a)
runForkBaseDefault = (>>= lift . runForkBase) . runFork
This lets us complete out a MonadRunForkBase
instance for StateT
in two steps. First, we'll use our runFork
from above to make a MonadTransRunFork
instance
import Control.Monad
import qualified Control.Monad.Trans.State.Lazy as State
instance MonadTransRunFork (State.StateT s) where
runFork x = State.get >>= return . liftM fst . State.runStateT x
Then we'll use the default to provide a MonadRunForkBase
instance.
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
instance (MonadRunForkBase b m) => MonadRunForkBase b (State.StateT s m) where
runForkBase = runForkBaseDefault
We can do the same thing for RWS
import qualified Control.Monad.Trans.RWS.Lazy as RWS
instance (Monoid w) => MonadTransRunFork (RWS.RWST r w s) where
runFork x = do
r <- RWS.ask
s <- RWS.get
return $ do
(a, s', w') <- RWS.runRWST x r s
return a
instance (MonadRunForkBase b m, Monoid w) => MonadRunForkBase b (RWS.RWST r w s m) where
runForkBase = runForkBaseDefault
MonadBaseControl
Unlike MonadRunForkBase
which we developed in the previous two sections, the MonadBaseControl
from monad-control doesn't have baked in the assumption "future state changes will not in general be shared between the two futures". MonadBaseContol
and control
make an effort to restore the state from branching in control structures with restoreM :: StM m a -> m a
. This doesn't present a problem for the forkIO
from base; using forkIO
is an example provided in the MonadBaseControl
documentation. This will be a slight problem for the forkIO
from threads because of the extra m (Result a)
returned.
The m (Result a)
we want will actually be returned as an IO (Result (StM m a))
. We can get rid of the IO
and replace it with an m
with liftBase
, leaving us with m (Result (StM m a))
. We could convert an StM m a
into an m a
that restores state and then returns a
with restoreM
, but it is stuck inside a Result ~ Either SomeException
. Either l
is a functor, so we can apply restoreM
everywhere inside it, simplifying the type to m (Result (m a))
. Either l
is also Traversable
, and for any Traversable
t
we can always swap it inside a Monad
or Applicative
with sequenceA :: t (f a) -> f (t a)
. In this case, we can use the special purpose mapM
which is a combination of fmap
and sequenceA
with only a Monad
constraint. This would give m (m (Result a))
, and the m
s would be flattened together by a join in the Monad or simply using >>=
. This gives rise to
{-# LANGUAGE FlexibleContexts #-}
import Control.Concurrent
import Control.Concurrent.Thread
import qualified Control.Concurrent.Thread.Group as TG
import Control.Monad.Base
import Control.Monad.Trans.Control
import Data.Functor
import Data.Traversable
import Prelude hiding (mapM)
fork :: (MonadBaseControl IO m) =>
TG.ThreadGroup -> m a -> m (ThreadId, m (Result a))
fork tg action = do
(tid, r) <- liftBaseWith (\runInBase -> TG.forkIO tg (runInBase action))
return (tid, liftBase r >>= mapM restoreM)
When we run the m (Result a)
in the original thread, it will copy the state from the forked thread to the original thread, which may be useful. If you want to restore the state of the main thread after reading the Result
you'll need to capture it first. checkpoint
will capture the entire state and return an action to restore it.
checkpoint :: MonadBaseControl b m => m (m ())
checkpoint = liftBaseWith (\runInBase -> runInBase (return ()))
>>= return . restoreM
A complete example will show what happens to the state from two threads. Both threads get the state from when the fork
happened regardless of efforts to modify the state in the other thread. When we wait for the result in the main thread, the state in the main thread is set to the state from the forked thread. We can get the main thread's state back by running the action created by checkpoint
.
import Control.Monad.State hiding (mapM)
example :: (MonadState String m, MonadBase IO m, MonadBaseControl IO m) => m ()
example = do
get >>= liftBase . putStrLn
tg <- liftBase TG.new
(_, getResult) <- fork tg (get >>= put . ("In Fork:" ++) >> return 7)
get >>= put . ("In Main:" ++)
revert <- checkpoint
result <- getResult
(liftBase . print) result
get >>= liftBase . putStrLn
revert
get >>= liftBase . putStrLn
main = do
runStateT example "Initial"
return ()
This outputs
Initial
Right 7
In Fork:Initial
In Main:Initial