Cluster sharding client not connecting with host

2019-09-15 16:15发布

问题:

After recent investigation and a Stack over flow question I realise that the cluster sharding is a better option than a cluster-consistent-hash-router. But I am having trouble getting a 2 process cluster going.

One process is the Seed and the other is the Client. The Seed node seems to continuously throw dead letter messages (see the end of this question).

This Seed HOCON follows:

akka {
loglevel = "INFO"                    

actor {
    provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
    serializers {
        wire = "Akka.Serialization.WireSerializer, Akka.Serialization.Wire"
    }
    serialization-bindings {
        "System.Object" = wire
    }
}                    

remote {
    dot-netty.tcp {
        hostname = "127.0.0.1"
        port = 5000
    }
}

persistence {
    journal {
        plugin = "akka.persistence.journal.sql-server"
        sql-server {
            class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
            schema-name = dbo
            auto-initialize = on
            connection-string = "Data Source=localhost;Integrated Security=True;MultipleActiveResultSets=True;Initial Catalog=ClusterExperiment01"
            plugin-dispatcher = "akka.actor.default- dispatcher"
            connection-timeout = 30s
            table-name = EventJournal
            timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
            metadata-table-name = Metadata
        }
    }

    sharding {
        connection-string = "Data Source=localhost;Integrated Security=True;MultipleActiveResultSets=True;Initial Catalog=ClusterExperiment01"
        auto-initialize = on
        plugin-dispatcher = "akka.actor.default-dispatcher"
        class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
        connection-timeout = 30s
        schema-name = dbo
        table-name = ShardingJournal
        timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
        metadata-table-name = ShardingMetadata
    }
}

snapshot-store {
    sharding {
        class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
        plugin-dispatcher = "akka.actor.default-dispatcher"
        connection-string = "Data Source=localhost;Integrated Security=True;MultipleActiveResultSets=True;Initial Catalog=ClusterExperiment01"
        connection-timeout = 30s
        schema-name = dbo
        table-name = ShardingSnapshotStore
        auto-initialize = on
    }
}

cluster {
    seed-nodes = ["akka.tcp://my-cluster-system@127.0.0.1:5000"]
    roles = ["Seed"]

    sharding {
        journal-plugin-id = "akka.persistence.sharding"
        snapshot-plugin-id = "akka.snapshot-store.sharding"
    }
}}

I have a method that essentially turns the above into a Config like so:

var config = NodeConfig.Create(/* HOCON above */).WithFallback(ClusterSingletonManager.DefaultConfig());

Without the "WithFallback" I get a null reference exception out of the config generation.

And then generates the system like so:

var system = ActorSystem.Create("my-cluster-system", config);

The client creates its system in the same manner and the HOCON is almost identical aside from:

{
remote {
    dot-netty.tcp {
        hostname = "127.0.0.1"
        port = 5001
    }
}
cluster {
    seed-nodes = ["akka.tcp://my-cluster-system@127.0.0.1:5000"]
    roles = ["Client"]
    role.["Seed"].min-nr-of-members = 1
    sharding {
        journal-plugin-id = "akka.persistence.sharding"
        snapshot-plugin-id = "akka.snapshot-store.sharding"
    }
}}

The Seed node creates the sharding like so:

ClusterSharding.Get(system).Start(
   typeName: "company-router",
   entityProps: Props.Create(() => new CompanyDeliveryActor()),                    
   settings: ClusterShardingSettings.Create(system),
   messageExtractor: new RouteExtractor(100)
);

And the client creates a sharding proxy like so:

ClusterSharding.Get(system).StartProxy(
    typeName: "company-router",
    role: "Seed",
    messageExtractor: new RouteExtractor(100));

The RouteExtractor is:

public class RouteExtractor : HashCodeMessageExtractor
{
    public RouteExtractor(int maxNumberOfShards) : base(maxNumberOfShards)
    {   
    }
    public override string EntityId(object message) => (message as IHasRouting)?.Company?.VolumeId.ToString();
    public override object EntityMessage(object message) => message;
}

In this scenario the VolumeId is always the same (just for experiment sake).

Both processes come to life but the Seed keeps throwing this error to the log:

[INFO][7/05/2017 9:00:58 AM][Thread 0003][akka://my-cluster-system/user/sharding /company-routerCoordinator/singleton/coordinator] Message Register from akka.tcp ://my-cluster-system@127.0.0.1:5000/user/sharding/company-router to akka://my-cl uster-system/user/sharding/company-routerCoordinator/singleton/coordinator was n ot delivered. 4 dead letters encountered.

Ps. I am not using Lighthouse.

回答1:

From the quick look, you're starting a cluster sharding proxy on your client node and you're telling it that sharded nodes are those using seed role. This doesn't match the cluster sharding definition on seed node, when you haven't specified any role.

Since there is no role to limit it, cluster sharding on a seed node will treat all nodes in the cluster as perfectly capable of hosting sharded actors - including client node, which doesn't have cluster sharding (non-proxy) instantiated on it.

This may not be the only issue, but you could either host cluster sharding on all of your nodes, or use ClusterShardingSettings.Create(system).WithRole("seed") to limit your shard only to a specific subset of nodes (having seed role) in the cluster.



回答2:

Thanks Horusiath, that's fixed it:

return sharding.Start(
   typeName: "company-router",
   entityProps: Props.Create(() => new CompanyDeliveryActor()),                    
   settings: ClusterShardingSettings.Create(system).WithRole("Seed"),
                messageExtractor: new RouteExtractor(100)                
            );

The clustered shard is now communicating between the 2 processes. Thanks very much for that bit.