PySpark Overwrite added sc.addPyFile

2020-07-24 05:23发布

问题:

I have these 2 files saved under this path:

C:\code\sample1\main.py

def method():
    return "this is sample method 1"

C:\code\sample2\main.py

def method():
    return "this is sample method 2"

and then i run this:

from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext()
spark = SparkSession(sc)

sc.addPyFile("~/code/sample1/main.py")
main1 = __import__("main")
print(main1.method()) # this is sample method 1

sc.addPyFile("~/code/sample2/main.py") # Error

The error is

Py4JJavaError: An error occurred while calling o21.addFile. : org.apache.spark.SparkException: File C:\Users\hans.yulian\AppData\Local\Temp\spark-5da165cf-410f-4576-8124-0ab23aba6aa3\userFiles-25a7ca23-84fb-42b7-95d9-206867fb9dfd\main.py exists and does not match contents of /C:/Users/hans.yulian/Documents/spark-test/main2/main.py

Which means that it already have "main.py" file in their temporary folder and the content is different. I wonder if there are any workaround for this case, but for me i have these limitation:

  1. The file name still have to be "main.py", only folder can be different
  2. It's okay to somehow purge the temporary folder to add aga
  3. in another file The only solution i have is by appending random string in front of main.py, for example abcdemain.py and fghijmain.py, then i will import main = __import__("abcdemain"), but this one is not really preferable

回答1:

While it is technically possible, by setting spark.files.overwrite to "true":

from pyspark import SparkConf, SparkContext

sc = SparkContext(conf=SparkConf().set("spark.files.overwrite", "true"))

and in simple cases will give correct results

def f(*_):                                                                   
    from main import method
    return [method()]

sc.addFile("/path/to/sample1/main.py") 
sc.parallelize([], 3).mapPartitions(f).collect()
['this is sample method 1',
'this is sample method 1',
'this is sample method 1']
sc.addFile("/path/to/sample2/main.py")

sc.parallelize([], 3).mapPartitions(f).collect()
['this is sample method 2',
 'this is sample method 2',
 'this is sample method 2']

it won't be reliable in practice, even if you reload modules on each access and will make your application hard to reason about. Since Spark may implicitly cache certain objects, or transparently restart Python workers you can easily end up in a situation, where different nodes see different state of the source.