I have the following simple code for loading a table from my Postgres database into an RDD.
# this setup is just for spark-submit, will be ignored in pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
conf = SparkConf().setAppName("GA")#.setMaster("localhost")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
# func for loading table
def get_db_rdd(table):
url = "jdbc:postgresql://localhost:5432/harvest?user=postgres"
print(url)
lower = 0
upper = 1000
ret = sqlContext \
.read \
.format("jdbc") \
.option("url", url) \
.option("dbtable", table) \
.option("partitionColumn", "id") \
.option("numPartitions", 1024) \
.option("lowerBound", lower) \
.option("upperBound", upper) \
.option("password", "password") \
.load()
ret = ret.rdd
return ret
# load table, and print results
print(get_db_rdd("mytable").collect())
I run ./bin/pyspark
then paste that into the interpreter, and it prints out the data from my table as expected.
Now, if I save that code to a file named test.py
then do ./bin/spark-submit test.py
, it starts to run, but then I see these messages spam my console forever:
17/02/16 02:24:21 INFO Executor: Running task 45.0 in stage 0.0 (TID 45)
17/02/16 02:24:21 INFO JDBCRDD: closed connection
17/02/16 02:24:21 INFO Executor: Finished task 45.0 in stage 0.0 (TID 45). 1673 bytes result sent to driver
Edit: This is on a single machine. I haven't started any masters or slaves; spark-submit
is the only command I run after system start. I tried with the master/slave setup with the same results.
My spark-env.sh
file looks like this:
export SPARK_WORKER_INSTANCES=2
export SPARK_WORKER_CORES=2
export SPARK_WORKER_MEMORY=800m
export SPARK_EXECUTOR_MEMORY=800m
export SPARK_EXECUTOR_CORES=2
export SPARK_CLASSPATH=/home/ubuntu/spark/pg_driver.jar # Postgres driver I need for SQLContext
export PYTHONHASHSEED=1337 # have to make workers use same seed in Python3
It works if I spark-submit a Python file that just creates an RDD from a list or something. I only have problems when I try to use a JDBC RDD. What piece am I missing?