Creating a Behavior for a continuously measurable

2020-07-03 14:31发布

问题:

I would like to create a Behavior t a from an IO a, with the intended semantics that the IO action would be run every time the behavior is sampled:

{- language FlexibleContexts #-}
import Reflex.Dom
import Control.Monad.Trans

onDemand :: (MonadWidget t m, MonadIO (PullM t)) => IO a -> m (Behavior t a)

I hoped I could do this by just executing the measurement in a pull:

onDemand measure = return $ pull (liftIO measure)

However, the resulting Behavior never changes after an initial measurement.

The workaround I could come up with was to create a dummy Behavior that changes "frequently enough" and then create a fake dependency on that:

import Data.Time.Clock as Time

hold_ :: (MonadHold t m, Reflex t) => Event t a -> m (Behavior t ())
hold_ = hold () . (() <$)

onDemand :: (MonadWidget t m, MonadIO (PullM t)) => IO a -> m (Behavior t a)
onDemand measure = do
    now <- liftIO Time.getCurrentTime
    tick <- hold_ =<< tickLossy (1/1200) now
    return $ pull $ do
        _ <- sample tick
        liftIO measure

This then works as expected; but since Behaviors can only be sampled on demand anyway, this shouldn't be necessary.

What is the correct way to create a Behavior for a continuous, observable-at-any-time phenomenon?

回答1:

Doing this in Spider looks impossible. Internal reasoning ahead.

In the Spider implementation of Reflex, one of the possible Behaviors is to pull the value.

data Behavior a
   = BehaviorHold !(Hold a)
   | BehaviorConst !a
   | BehaviorPull !(Pull a)

A Pulled value consists of how to compute the value when needed, pullCompute, and a cached value to avoid unnecessary re-computation, pullValue.

data Pull a
   = Pull { pullValue :: !(IORef (Maybe (PullSubscribed a)))
          , pullCompute :: !(BehaviorM a)
          }

Ignoring the ugly environment of BehaviorM, liftIO lifts an IO computation the obvious way, it runs it when the BehaviorM needs to be sampled. In the Pull, your behavior is observed once but isn't re-observed because the cached value isn't invalidated.

The cached value PullSubscribed a consists of the value a, a list of other values that need to be invalidated if this value is invalidated, and some boring memory management stuff.

data PullSubscribed a
   = PullSubscribed { pullSubscribedValue :: !a
                    , pullSubscribedInvalidators :: !(IORef [Weak Invalidator])
                    -- ... boring memory stuff
                    }

An Invalidator is a quantified Pull that's enough to get the memory reference to recursively read the invalidators to invalidate and write the cached value to Nothing.

To pull constantly we'd like to be able to constantly invalidate our own BehaviorM. When executed, the environment passed to the BehaviorM has a copy of its own invalidator, which is used by dependencies of the BehaviorM to invalidate it when they themselves become invalid.

From the internal implementation of readBehaviorTracked there seems to be no way that the behavior's own invalidator (wi) can ever end up in the list of subscribers that are invalidated when it is sampled (invsRef).

    a <- liftIO $ runReaderT (unBehaviorM $ pullCompute p) $ Just (wi, parentsRef)
    invsRef <- liftIO . newIORef . maybeToList =<< askInvalidator
    -- ...
    let subscribed = PullSubscribed
          { pullSubscribedValue = a
          , pullSubscribedInvalidators = invsRef
          -- ...
          }

Outside of the internals, if there does exist a way to constantly sample a Behavior it would involve a MonadFix (PullM t) instance or mutual recursion through fixing of pull and sample:

onDemand :: (Reflex t, MonadIO (PullM t)) => IO a -> Behavior t a
onDemand read = b
    where
        b = pull go
        go = do
             sample b
             liftIO read

I don't have a Reflex environment to try this in, but I don't think the results will be pretty.



回答2:

I've been experimenting with this for a while and found a workaround. It seems to work with the latest version of reflex to date. The trick is to forcefully invalidate the cached value every time you evaluate a given IO action.

import qualified Reflex.Spider.Internal as Spider

onDemand :: IO a -> Behavior t a
onDemand ma = SpiderBehavior . Spider.Behavior
            . Spider.BehaviorM . ReaderT $ computeF
  where
    computeF (Nothing, _) = unsafeInterleaveIO ma
    computeF (Just (invW,_), _) = unsafeInterleaveIO $ do
        toReconnect <- newIORef []
        _ <- Spider.invalidate toReconnect [invW]
        ma

It is important to use unsafeInterleaveIO to run the invalidator as late as possible, so that it invalidates an existing thing.

There is another problem with this code: I ignore toReconnect reference and the result of invalidate function. In current version of reflex, the latter is always empty, so it should not cause any problems. But I am not sure about toReconnect: from the code, it seems that if it has some subscribed switches, they might break if not treated properly. Though I am not sure if this kind of behavior can have switches subscribed or not.

UPDATE for those who really want to implement this: The code above can deadlock in some complicated setups. My solutions was to perform invalidation slightly after the computation itself in a separate thread. Here is the complete code snippet. The solution by the link seems to work correctly (using it for almost a year now in production).