Akka.Net why are my pool actors not scaling up or

2019-06-08 17:35发布

I am trying to build an actor system that can scale up/down based on the workload using the following configuration.

  akka {
        actor {
          serializers {
            wire = "Akka.Serialization.WireSerializer, Akka.Serialization.Wire"
          }
          serialization-bindings {
            "System.Object" = wire
          }              
        deployment {
          /analysis {
            router = round-robin-pool
            routees.paths = ["/user/analysis"]
            resizer {
                enabled = on
                lower-bound = 1
                upper-bound = 20
            }
          }
        }
        }
      }

in my main loop I am creating 3000 messages and pushing to the actors for processing

        var runAnalysisProps = Props.Create<RunAnalysisActor>().WithRouter(FromConfig.Instance);
        //var runAnalysisProps = Props.Create<RunAnalysisActor>().WithRouter(new RoundRobinPool(0, new DefaultResizer(lower:1, upper: 10 ))); **This do not help either **
        var analysisRef = RunAnalysisActorSystem.ActorOf(runAnalysisProps, "analysis");

        Logger.LogMessage("Started Posting all messages", ConsoleColor.Blue);
        for (int i = 0; i < 3000; i++)
        {
            analysisRef.Tell(new AnalysisMessage(100 + i, $"FIC{100+ i}", $"c:\\temp\\fic{100 + i}.dat"));

        }
        Logger.LogMessage("Completed posting all messages", ConsoleColor.Blue);

DO we see anything wrong in the for loop above to me it looks like its sending all the messages to to one actor ref rather than a router.

my ReceiveActor is very simple logging application

public RunAnalysisActor()
        {
            Receive<AnalysisMessage>((message)=> {
                Logger.LogMessage($"Analysis Started => Id: {message.AnalysisId}, Loop: {message.LoopName}, File:{message.FilePath}");
                var waitTime = rnd.Next(5) * 100;
                //Logger.LogMessage($"Waiting => {waitTime}");
                Thread.Sleep(10);
                Logger.LogMessage($"Analysis Completed=> AId: {message.AnalysisId}, Loop: {message.LoopName}, File:{message.FilePath}");

            });
        }

I have also overriden AroundPreStart and PostStop to see how many actors are started and stopped. but I see that when the application starts, it only creates lower-bound or min of 2 number of actors, I was expecting that this will create more actors based on the pressure-threshold default 1, but I see that its not happening and neither its is shrinking when there is not any messages to process.

Can someone please help me understand what I am doing wrong here. - Thanks

Update 1:

I think I figured out the actual issue. While posting the message, I need to change my logic to append a path to my deployment config routees.paths = ["/user/analysis"] and also use and Ask function to get the correct referrence using analysisRef.Ask<Routees> an something like this.

        RunAnalysisActorSystem.Scheduler.Advanced.ScheduleRepeatedly(
            TimeSpan.FromMilliseconds(500),
            TimeSpan.FromMilliseconds(500),
            () => {
                if (analysisRef.Ask<Routees>(new GetRoutees()).Result.Members.Any())
                {
                    var i = rand.Next();
                    analysisRef.Tell(new AnalysisMessage(100 + i, $"FIC{100 + i}", $"c:\\temp\\fic{100 + i}.dat"));
                }
            }
        );

0条回答
登录 后发表回答