Haskell, Channels, STM, -threaded, Message Passing

2019-04-29 16:40发布

问题:

I am trying to use channels/STM to implement message passing in Haskell. Maybe this is a terrible idea, and there is a better way to implement/use message passing in Haskell. If this is the case, do let me know; however, my quest has opened some basic questions on concurrent Haskell.

I have heard great things about STM, and in particular the implementation in Haskell. Since it supports reading to and writing from, and has some safety benefits, I figured one would start there. This brings up my biggest question: does

msg <- atomically $ readTChan chan

where chan is a TChan Int, cause a wait that waits for the channel to have a value on it?

Consider the following program:

p chan = do
    atomically $ writeTChan chan 1
    atomically $ writeTChan chan 2

q chan = do
    msg1 <- atomically $ readTChan chan 
    msg2 <- atomically $ readTChan chan
    -- for testing purposes
    putStrLn $ show msg1 
    putStrLn $ show msg2

main = do
    chan <- atomically $ newTChan
    p chan
    q chan

Compile this with ghc --make -threaded, and then run the program, and indeed you get 1 followed by 2 printed to console. Now, suppose we do

main = do 
    chan <- atomically $ newTChan
    forkIO $ p chan 
    forkIO $ q chan

instead. Now, if we use - threaded, it will either print nothing, 1, or 1 followed by 2 to the terminal; however, if you don't compile with -threaded it always prints 1 followed by 2. Question 2: what is the difference between -threaded and not? I imagine that they aren't really running as concurrent things, and they are just run one after the other. This is consistent with what follows.

Now, in my thinking if I had p and q running concurrently; i.e. I forkIO'd them, they should be able to run in the opposite order. Supposing

main = do
    chan <- atomically newTChan
    forkIO $ q chan
    forkIO $ p chan

Now, if I compile this without -threaded, I never get anything printed to console. If I compile with -threaded, I sometimes do. Although, it is very rare to get 1 followed by 2 -- usually just 1 or nothing. I tried this with Control.Concurrent.Chan as well, and got consistent results.

Second big question: how do channels and fork play together, and what is going on in the above program?

At any rate, it seems that I can't so naively simulate message passing with STM. Perhaps Cloud Haskell is an option that solves these problems -- I really don't know. Any information on how to get message passing going short of serialize ~~> write to socket ~~> read from socket ~~> deserialize would be hugely appreciated.

回答1:

No your idea is right - this is kindof what TChans are for - you just missed a minor point of forkIO:

The problem is that your main thread will not wait for the termination of the threads created with forkIO (see here for reference)

so if I use the hint given in the reference:

import Control.Concurrent
import Control.Concurrent.STM

p :: Num a => TChan a -> IO ()
p chan = do
    atomically $ writeTChan chan 1
    atomically $ writeTChan chan 2

q chan = do
    msg1 <- atomically $ readTChan chan 
    msg2 <- atomically $ readTChan chan
    -- for testing purposes
    putStrLn $ show msg1 
    putStrLn $ show msg2

main :: IO ()
main = do
    children <- newMVar []
    chan <- atomically $ newTChan
    _ <- forkChild children $ p chan
    _ <- forkChild children $ q chan
    waitForChildren children
    return ()

waitForChildren :: MVar [MVar ()] -> IO ()
waitForChildren children = do
  cs <- takeMVar children
  case cs of
    []   -> return ()
    m:ms -> do
      putMVar children ms
      takeMVar m
      waitForChildren children

forkChild :: MVar [MVar ()] -> IO () -> IO ThreadId
forkChild children io = do
  mvar <- newEmptyMVar
  childs <- takeMVar children
  putMVar children (mvar:childs)
  forkFinally io (\_ -> putMVar mvar ())

it works as expected:

d:/Temp $ ghc --make -threaded tchan.hs
[1 of 1] Compiling Main             ( tchan.hs, tchan.o )
Linking tchan.exe ...
d:/Temp $ ./tchan.exe 
1
2
d:/Temp $

and of course it will continue to work if you switch the calls to p and q too