How to set PYTHONHASHSEED on AWS EMR

2019-05-07 12:39发布

问题:

Is there any way to set an environment variable on all nodes of an EMR cluster?

I am getting an error when trying to use reduceByKey() in Python3 PySpark, and getting an error regarding the hash seed. I can see this is a known error, and that the environment varialbe PYTHONHASHSEED needs to be set to the same value on all nodes of the cluster, but I haven't had any luck with it.

I have tried adding a variable to spark-env through the cluster configuration:

[
  {
    "Classification": "spark-env",

      "Configurations": [
      {
         "Classification": "export",
         "Properties": {
          "PYSPARK_PYTHON": "/usr/bin/python3",
          "PYTHONHASHSEED": "123"
       }
     }
   ]
 },
 {
   "Classification": "spark",
   "Properties": {
     "maximizeResourceAllocation": "true"
    }
  }
]

but this doesn't work. I have also tried adding a bootstrap script:

#!/bin/bash
export PYTHONHASHSEED=123

but this also doesn't seem to do the trick.

回答1:

I believe that the /usr/bin/python3 isn't picking up the environment variable PYTHONHASHSEED that you are defining in the cluster configuration under the spark-env scope.

You ought using python34 instead of /usr/bin/python3 and set the configuration as followed :

[
   {
      "classification":"spark-defaults",
      "properties":{
         // [...]
      }
   },
   {
      "configurations":[
         {
            "classification":"export",
            "properties":{
               "PYSPARK_PYTHON":"python34",
               "PYTHONHASHSEED":"123"
            }
         }
      ],
      "classification":"spark-env",
      "properties":{
        // [...]
      }
   }
]

Now, let's test it. I define a bash script call both pythons :

#!/bin/bash

echo "using python34"
for i in `seq 1 10`;
  do
    python -c "print(hash('foo'))";
  done
echo "----------------------"
echo "using /usr/bin/python3"
for i in `seq 1 10`;
  do
    /usr/bin/python3 -c "print(hash('foo'))";
  done

The verdict :

[hadoop@ip-10-0-2-182 ~]$ bash test.sh
using python34
-4177197833195190597
-4177197833195190597
-4177197833195190597
-4177197833195190597
-4177197833195190597
-4177197833195190597
-4177197833195190597
-4177197833195190597
-4177197833195190597
-4177197833195190597
----------------------
using /usr/bin/python3
8867846273747294950
-7610044127871105351
6756286456855631480
-4541503224938367706
7326699722121877093
3336202789104553110
3462714165845110404
-5390125375246848302
-7753272571662122146
8018968546238984314

PS1: I am using AMI release emr-4.8.2.

PS2: Snippet inspired from this answer.

EDIT: I have tested the following using pyspark.

16/11/22 07:16:56 INFO EventLoggingListener: Logging events to hdfs:///var/log/spark/apps/application_1479798580078_0001
16/11/22 07:16:56 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.2
      /_/

Using Python version 3.4.3 (default, Sep  1 2016 23:33:38)
SparkContext available as sc, HiveContext available as sqlContext.
>>> print(hash('foo'))
-2457967226571033580
>>> print(hash('foo'))
-2457967226571033580
>>> print(hash('foo'))
-2457967226571033580
>>> print(hash('foo'))
-2457967226571033580
>>> print(hash('foo'))
-2457967226571033580

Also created a simple application (simple_app.py):

from pyspark import SparkContext

sc = SparkContext(appName = "simple-app")

numbers = [hash('foo') for i in range(10)]

print(numbers)

Which also seems to work perfectly :

[hadoop@ip-*** ~]$ spark-submit --master yarn simple_app.py 

Output (truncated) :

[...]
16/11/22 07:28:42 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
[-5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594] // THE RELEVANT LINE IS HERE.
16/11/22 07:28:42 INFO SparkContext: Invoking stop() from shutdown hook
[...]

As you can see it also works returning the same hash each time.

EDIT 2: From the comments, it seems like you are trying to compute hashes on the executors and not the driver, thus you'll need to set up spark.executorEnv.PYTHONHASHSEED, inside your spark application configuration so it can be propagated on the executors (it's one way to do it).

Note : Setting the environment variables for executors is the same with YARN client, use the spark.executorEnv.[EnvironmentVariableName].

Thus the following minimalist example with simple_app.py :

from pyspark import SparkContext, SparkConf

conf = SparkConf().set("spark.executorEnv.PYTHONHASHSEED","123")
sc = SparkContext(appName="simple-app", conf=conf)

numbers = sc.parallelize(['foo']*10).map(lambda x: hash(x)).collect()

print(numbers)

And now let's test it again. Here is the truncated output :

16/11/22 14:14:34 INFO DAGScheduler: Job 0 finished: collect at /home/hadoop/simple_app.py:6, took 14.251514 s
[-5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594]
16/11/22 14:14:34 INFO SparkContext: Invoking stop() from shutdown hook

I think that this covers all.



回答2:

From the spark docs

Note: When running Spark on YARN in cluster mode, environment variables need to be set using the spark.yarn.appMasterEnv.[EnvironmentVariableName] property in your conf/spark-defaults.conf file. Environment variables that are set in spark-env.sh will not be reflected in the YARN Application Master process in cluster mode. See the YARN-related Spark Properties for more information.

Properties are listed here so I think you want this:

Add the environment variable specified by EnvironmentVariableName to the Application Master process launched on YARN.

spark.yarn.appMasterEnv.PYTHONHASHSEED="XXXX"

EMR docs for configuring spark-defaults.conf are here.

[
    {
      "Classification": "spark-defaults",
      "Properties": {
        "spark.yarn.appMasterEnv.PYTHONHASHSEED: "XXX"
      }
    }
]


回答3:

Just encountered the same problem, adding the following configuration solved it:

# Some settings...
Configurations=[
        {
            "Classification": "spark-env",
            "Properties": {},
            "Configurations": [
                {
                    "Classification": "export",
                    "Properties": {
                        "PYSPARK_PYTHON": "python34"
                    },
                    "Configurations": []
                }
            ]
        },
        {
            "Classification": "hadoop-env",
            "Properties": {},
            "Configurations": [
                {
                    "Classification": "export",
                    "Properties": {
                        "PYTHONHASHSEED": "0"
                    },
                    "Configurations": []
                }
            ]
        }
        ],
# Some more settings...

Be careful: we do not use yarn as a cluster manager, for the moment the cluster is only running Hadoop and Spark.

EDIT : Following Tim B comment, this seems to work also with yarn installed as a cluster manager.



回答4:

You could probably do it via the bootstrap script but you'll need to do something like this:

echo "PYTHONHASHSEED=XXXX" >> /home/hadoop/.bashrc

(or possibly .profile)

So that it's picked up by the spark processes when they are launched.

Your configuration looks reasonable though, it might be worth setting it in the hadoop-env section instead?