I want to have actors running on various processes (or nodes) send messages to other actors running off of different processes (or nodes), all while maintaining fault-tolerance and load balancing. I am currently attempting to use Akka.Cluster's Sharding feature to accomplish this.
However, I am not sure how to accomplish this...
I have the following code that reflects my seed node:
let configurePort port =
let config = Configuration.parse ("""
akka {
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
serializers {
hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
}
serialization-bindings {
"System.Object" = hyperion
}
}
remote {
helios.tcp {
public-hostname = "localhost"
hostname = "localhost"
port = """ + port.ToString() + """
}
}
cluster {
auto-down-unreachable-after = 5s
seed-nodes = [ "akka.tcp://cluster-system@localhost:2551/" ]
}
persistence {
journal.plugin = "akka.persistence.journal.inmem"
snapshot-store.plugin = "akka.persistence.snapshot-store.local"
}
}
""")
config.WithFallback(ClusterSingletonManager.DefaultConfig())
let consumer (actor:Actor<_>) msg = printfn "\n%A received %s" (actor.Self.Path.ToStringWithAddress()) msg |> ignored
// spawn two separate systems with shard regions on each of them
let system1 = System.create "cluster-system" (configurePort 2551)
let shardRegion1 = spawnSharded id system1 "shardRegion1" <| props (actorOf2 consumer)
System.Threading.Thread.Sleep(1000)
let system2 = System.create "cluster-system" (configurePort 2552)
let shardRegion2 = spawnSharded id system2 "shardRegion2" <| props (actorOf2 consumer)
System.Threading.Thread.Sleep(1000)
let system3 = System.create "cluster-system" (configurePort 2553)
let shardRegion3 = spawnSharded id system3 "shardRegion3" <| props (actorOf2 consumer)
System.Threading.Thread.Sleep(3000)
// NOTE: Even thou we sent all messages through single shard region,
// some of them will be executed on the second and third one thanks to shard balancing
System.Threading.Thread.Sleep(3000)
shardRegion1 <! ("shard-1", "entity-1", "hello world 1")
shardRegion1 <! ("shard-1", "entity-2", "hello world 2")
shardRegion1 <! ("shard-2", "entity-3", "hello world 3")
shardRegion1 <! ("shard-2", "entity-4", "hello world 4")
System.Threading.Thread.Sleep(1000)
let printShards shardRegion =
async {
let! (reply:AskResult<ShardRegionStats>) = (retype shardRegion) <? GetShardRegionStats.Instance
let (stats: ShardRegionStats) = reply.Value
for kv in stats.Stats do
printfn "\tShard '%s' has %d entities on it" kv.Key kv.Value
} |> Async.RunSynchronously
let printNodes() =
printfn "\nShards active on node 'localhost:2551':"
printShards shardRegion1
printfn "\nShards active on node 'localhost:2552':"
printShards shardRegion2
printfn "\nShards active on node 'localhost:2553':"
printShards shardRegion3
printNodes()
The output looks something like this:
Shards active on node 'localhost:2551':
Shard 'shard-1' has 2 entities on it
Shard 'shard-2' has 2 entities on it
Shards active on node 'localhost:2552':
I then have a separate process that executes the following code:
let configurePort port =
let config = Configuration.parse ("""
akka {
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
serializers {
hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
}
serialization-bindings {
"System.Object" = hyperion
}
}
remote {
helios.tcp {
public-hostname = "localhost"
hostname = "localhost"
port = "0"
}
}
cluster {
auto-down-unreachable-after = 5s
seed-nodes = [ "akka.tcp://cluster-system@localhost:2551/" ]
}
persistence {
journal.plugin = "akka.persistence.journal.inmem"
snapshot-store.plugin = "akka.persistence.snapshot-store.local"
}
}
""")
config.WithFallback(ClusterSingletonManager.DefaultConfig())
let consumer (actor:Actor<_>) msg = printfn "\n%A received %s" (actor.Self.Path.ToStringWithAddress()) msg |> ignored
// spawn two separate systems with shard regions on each of them
let system1 = System.create "cluster-system" (configurePort 2554)
let shardRegion1 = spawnSharded id system1 "printer" <| props (actorOf2 consumer)
System.Threading.Thread.Sleep(1000)
let system2 = System.create "cluster-system" (configurePort 2555)
let shardRegion2 = spawnSharded id system2 "printer" <| props (actorOf2 consumer)
System.Threading.Thread.Sleep(1000)
let system3 = System.create "cluster-system" (configurePort 2556)
let shardRegion3 = spawnSharded id system3 "printer" <| props (actorOf2 consumer)
My cluster system (running on a separate process) recognizes new nodes that are joining:
> [INFO][3/15/2017 9:12:13 PM][Thread 0054][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Node [akka.tcp://cluster-system@localhost:52953] is JOINING, roles []
[INFO][3/15/2017 9:12:14 PM][Thread 0006][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Node [akka.tcp://cluster-system@localhost:52956] is JOINING, roles []
[INFO][3/15/2017 9:12:15 PM][Thread 0054][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Node [akka.tcp://cluster-system@localhost:52961] is JOINING, roles []
[INFO][3/15/2017 9:12:18 PM][Thread 0055][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Leader is moving node [akka.tcp://cluster-system@localhost:52953] to [Up]
[INFO][3/15/2017 9:12:18 PM][Thread 0055][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Leader is moving node [akka.tcp://cluster-system@localhost:52956] to [Up]
[INFO][3/15/2017 9:12:18 PM][Thread 0055][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Leader is moving node [akka.tcp://cluster-system@localhost:52961] to [Up]
Conclusion:
In conclusion, I want to have actors running on various processes (or nodes) send messages to other actors running off of different processes (or nodes) while maintaining fault-tolerance and load balancing. I am currently attempting to use Akka.Cluster's Sharding feature to accomplish this.
Appendix:
open System
open System.IO
#if INTERACTIVE
let cd = Path.Combine(__SOURCE_DIRECTORY__, "../src/Akkling.Cluster.Sharding/bin/Debug")
System.IO.Directory.SetCurrentDirectory(cd)
#endif
#r "../src/Akkling.Cluster.Sharding/bin/Debug/System.Collections.Immutable.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Hyperion.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Newtonsoft.Json.dll"
#r @"C:\Users\Snimrod\Documents\Visual Studio 2015\Projects\Temp\packages\Akka.FSharp.1.1.3\lib\net45\Akka.FSharp.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/FSharp.PowerPack.Linq.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Helios.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/FsPickler.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.Serialization.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Remote.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Persistence.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.Tools.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.Sharding.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Serialization.Hyperion.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.Persistence.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.Cluster.Sharding.dll"
open Akka.Actor
open Akka.Configuration
open Akka.Cluster
open Akka.Cluster.Tools.Singleton
open Akka.Cluster.Sharding
open Akka.Persistence
open Akkling
open Akkling.Persistence
open Akkling.Cluster
open Akkling.Cluster.Sharding
open Hyperion
In order to maintain consistent view of the shards and their locations, Akka.Cluster.Sharding persistent backend must point to a database that is visible to all of the processes. In your configuration, you're using
akka.persistence.journal.inmem
which is in-memory data store (used only for tests and development). It won't be visible from another processes.You'll need to configure a persistent backend in order for shards to be visible between nodes living on different machines/processes. You can do that i.e. by using Akka.Persistence.SqlServer or any other plugin. This is the most basic configuration for your persistence backend used only by sharding:
For something more practical, please refer to this article.
Also keep in mind that both Akka.Cluster.Sharding and Akka.Persistence plugins are available only in prerelease mode (so you need to install-package with -pre flag).