Spark SQL RDD loads in pyspark but not in spark-su

2019-09-06 20:21发布

问题:

I have the following simple code for loading a table from my Postgres database into an RDD.

# this setup is just for spark-submit, will be ignored in pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
conf = SparkConf().setAppName("GA")#.setMaster("localhost")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

# func for loading table
def get_db_rdd(table):
    url = "jdbc:postgresql://localhost:5432/harvest?user=postgres"
    print(url)
    lower = 0
    upper = 1000
    ret = sqlContext \
      .read \
      .format("jdbc") \
      .option("url", url) \
      .option("dbtable", table) \
      .option("partitionColumn", "id") \
      .option("numPartitions", 1024) \
      .option("lowerBound", lower) \
      .option("upperBound", upper) \
      .option("password", "password") \
      .load()
    ret = ret.rdd
    return ret

# load table, and print results
print(get_db_rdd("mytable").collect())

I run ./bin/pyspark then paste that into the interpreter, and it prints out the data from my table as expected.

Now, if I save that code to a file named test.py then do ./bin/spark-submit test.py, it starts to run, but then I see these messages spam my console forever:

17/02/16 02:24:21 INFO Executor: Running task 45.0 in stage 0.0 (TID 45)
17/02/16 02:24:21 INFO JDBCRDD: closed connection
17/02/16 02:24:21 INFO Executor: Finished task 45.0 in stage 0.0 (TID 45). 1673 bytes result sent to driver

Edit: This is on a single machine. I haven't started any masters or slaves; spark-submit is the only command I run after system start. I tried with the master/slave setup with the same results. My spark-env.sh file looks like this:

export SPARK_WORKER_INSTANCES=2
export SPARK_WORKER_CORES=2
export SPARK_WORKER_MEMORY=800m
export SPARK_EXECUTOR_MEMORY=800m
export SPARK_EXECUTOR_CORES=2
export SPARK_CLASSPATH=/home/ubuntu/spark/pg_driver.jar # Postgres driver I need for SQLContext
export PYTHONHASHSEED=1337 # have to make workers use same seed in Python3

It works if I spark-submit a Python file that just creates an RDD from a list or something. I only have problems when I try to use a JDBC RDD. What piece am I missing?

回答1:

When using spark-submit you should supply the jar to the executors.

As mentioned in spark 2.1 JDBC documents:

To get started you will need to include the JDBC driver for you particular database on the spark classpath. For example, to connect to postgres from the Spark Shell you would run the following command:

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

Note: The same should be for spark-submit command

Troubleshooting

The JDBC driver class must be visible to the primordial class loader on the client session and on all executors. This is because Java’s DriverManager class does a security check that results in it ignoring all drivers not visible to the primordial class loader when one goes to open a connection. One convenient way to do this is to modify compute_classpath.sh on all worker nodes to include your driver JARs.



回答2:

This is a horrible hack. I'm not considering this the answer, but it does work.

Alright, only pyspark works? Fine, then we'll use it. Wrote this Bash script:

cat $1 | $SPARK_HOME/bin/pyspark # pipe the Python file into pyspark

I run that script in my Python script that's submitting jobs. Also, I'm including the code I use to pass arguments between the processes, in case it helps someone:

new_env = os.environ.copy()
new_env["pyspark_argument_1"] = "some param I need in my Spark script" # etc...
p = subprocess.Popen(["pyspark_wrapper.sh {}".format(py_fname)], shell=True, env=new_env)

In my Spark script:

something_passed_from_submitter = os.environ["pyspark_argument_1"]
# do stuff in Spark...

I feel like Spark is better supported and (if this is a bug) less buggy with Scala than with Python 3, so that might be the better solution for now. But my script uses some files we wrote in Python 3, so...