Using Monad/ST for non-concurrent message passing

2019-07-20 05:58发布

I am trying to work out a data structure for the following situation.

Graph Structure

I plan to have a graph of nodes with un-weighted, directed edges: Graph = [Node]

Each node has:

  1. Some TBD internal (persistent) state
  2. A queue of incoming messages
  3. A type of message it can send determined by a function that accepts the current node state (with the possibility of failure)
  4. A list of edges

Node { nodeState :: NodeState, inbox :: Queue NodeMessage, nodeMessage :: (NodeState -> Maybe NodeMessage), connections::[NodeEdge] }

Each edge is an intermediary step capturing pending messages for a target node

NodeEdge { pendingMessage:: Maybe NodeMessage, targetNode :: Node }

Message Passing

The message passing happens in phases and is not-conccurent (though the queues may be processed in parallel to reduce computation time).

  • Phase 1: Check the inbox of every node, processing any messages and updating the NodeState if relevant.
  • Phase 2: Have every node run nodeMessage, if this results in Just NodeMessage, send NodeMessage to every connection ([NodeEdge])
  • Phase 3: Check every node edge, if it has a message, add it to the target node's message queue.

Monat/ST

My original plan was to assign every node an ID (probably a simple Int) and store each node in a Map Int Node. I haven't tried the ST Monad before but I figured I could use something like ST s (M.Map Int Node). For any given phase each node's message send activity could be processed in O(k log N).

On the other hand if nodes/edges were able to update the mutable state of their edges/nodes then any single queue could be processed in O(k).

While the ST/Map method seems fairly intuitive, having the whole graph mutable is beyond me.

Any suggestions / tips / recommended reading?

1条回答
Root(大扎)
2楼-- · 2019-07-20 06:32

I am not going to mark this answer as correct because it does not truly answer the question. However it is the solution that I'm going with.

Because the number of nodes in my graph is never going to change I realised I could use an array. I'm actually reconsidering using a mutable datatype - even though I get a much simpler workflow updating the array I get less benefits of laziness and I end up writing a ton of imperative style code. I'm actually thinking about using an Array and the State Monad, rather than ST.

Here is a bit of test code I wrote, using STArray. A "proper" answer to this question would be a similar data type specifically for Graphs - perhaps out there is an STGraph library?

Anyway - here is the example code using STArray:

import Control.Monad.ST
import Data.Array.ST
import Data.Array

import qualified Data.Dequeue as DQ

type Id = Int

data Node = Node {
    nodeId :: Id,
    nodeState :: NodeState,
    nodeInbox :: DQ.BankersDequeue NodeMessage,
    nodeMessage :: (NodeState -> Maybe NodeMessage),
    connections :: [NodeEdge] }

instance Show Node where
    show x = "Node: " ++ (show . nodeId $ x) ++ " :: Inbox: " ++ (show . nodeInbox $ x) ++ " :: " ++ (show . connections $ x)

data NodeEdge = NodeEdge { pendingMessage:: Maybe NodeMessage, targetNode :: Id } deriving Show

data NodeState = NodeState { stateLevel :: Int } deriving Show

data NodeMessage = NodeMessage { value :: Int } deriving Show

es = [[NodeEdge Nothing 1,NodeEdge Nothing 2],[NodeEdge Nothing 0,NodeEdge Nothing 2],[NodeEdge Nothing 0,NodeEdge Nothing 1]]
ns = take 3 $ map (\x -> Node x (NodeState 0) (DQ.fromList []) (\_ -> Nothing) (es !! x)) $ [0,1..]

testArray :: Array Int Node
testArray = listArray (0,2) ns

testSTarr = do  arr <- newListArray (0,2) ns :: ST s (STArray s Int Node)
                a <- readArray arr 1
                let i = targetNode . head $ connections a
                b <- readArray arr i
                let m = NodeMessage 2
                    ms = DQ.pushBack (nodeInbox b) m
                    b' = b { nodeInbox = ms }
                writeArray arr (nodeId b) b'
                return arr

testSTarr' x = do a <- readArray x 0
                  return a

bp = testSTarr >>= testSTarr'

main = do
            print $ runST bp 
            return ()
查看更多
登录 后发表回答