Why flink container vcore size is always 1

2019-02-20 02:11发布

I am running flink on yarn(more precisely in AWS EMR yarn cluster).

I read flink document and source code that by default for each task manager container, flink will request the number of slot per task manager as the number of vcores when request resource from yarn. And I also confirmed from the source code:

// Resource requirements for worker containers
            int taskManagerSlots = taskManagerParameters.numSlots();
            int vcores = config.getInteger(ConfigConstants.YARN_VCORES, 
Math.max(taskManagerSlots, 1));
            Resource capability = Resource.newInstance(containerMemorySizeMB, 
vcores);

            resourceManagerClient.addContainerRequest(
                new AMRMClient.ContainerRequest(capability, null, null, 
priority));

When I use -yn 1 -ys 3 to start flink, I assume yarn will allocate 3 vcores for the only task manager container, but when I checked the number of vcores for each container from yarn resource manager web ui, I always see the number of vcores is 1. I also see vcore to be 1 from yarn resource manager logs.

I debugged the flink source code to the line I pasted below, and I saw value of vcores is 3. This is really confuse me, can anyone help to clarify for me, thanks.

3条回答
手持菜刀,她持情操
2楼-- · 2019-02-20 02:44

I get the answer finally. It's because yarn is use "DefaultResourceCalculator" allocation strategy, so only memory is counted for yarn RM, even if flink requested 3 vcores, but yarn simply ignore the cpu core number.

查看更多
该账号已被封号
3楼-- · 2019-02-20 02:46

@yinhua.

Use the command to start a session:./bin/yarn-session.sh, you need add -s arg.

-s,--slots Number of slots per TaskManager

details:

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/yarn_setup.html
  2. https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#usage
查看更多
爷、活的狠高调
4楼-- · 2019-02-20 02:53

An answer from Kien Truong

Hi,

You have to enable CPU scheduling in YARN, otherwise, it always shows that only 1 CPU is allocated for each container, regardless of how many Flink try to allocate. So you should add (edit) the following property in capacity-scheduler.xml:

<property>
 <name>yarn.scheduler.capacity.resource-calculator</name>
 <!-- <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value> -->
 <value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
</property>

TaskManager memory is, for example, 1400MB, but Flink reserves some amount for off-heap memory, so the actual heap size is smaller.

This is controlled by 2 settings:

containerized.heap-cutoff-min: default 600MB

containerized.heap-cutoff-ratio: default 15% of TM's memory

That's why your TM's heap size is limitted to ~800MB (1400 - 600)

Regards,

Kien

查看更多
登录 后发表回答