pyspark rdd isCheckPointed() is false

2020-04-21 09:23发布

问题:

I was encountering stackoverflowerrors when I was iteratively adding over 500 columns to my pyspark dataframe. So, I included checkpoints. The checkpoints did not help. So, I created the following toy application to test if my checkpoints were working correctly. All I do in this example is iteratively create columns by copying the original column over and over again. I persist, checkpoint and count every 10 iterations. I notice that my dataframe.rdd.isCheckpointed() always returns False. I can verify that the checkpoint folders are indeed being created and populated on disk. I am running on dataproc on glcoud.

Here is my code:

from pyspark import SparkContext, SparkConf
from pyspark import StorageLevel
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import sys

APP_NAME = "isCheckPointWorking"

spark = SparkSession\
    .builder\
    .appName(APP_NAME)\
    .config("spark.sql.crossJoin.enabled","true")\
    .getOrCreate()

sc = SparkContext.getOrCreate()

#set the checkpoint directory
sc.setCheckpointDir('gs://mybucket/checkpointtest/')

#create a spark dataframe with one column containing numbers 1 through 9
df4 = spark.createDataFrame(pd.DataFrame(np.arange(1,10),columns = ["A"]))
df4.show()

#create a list of new columns to be added to the dataframe
numberList = np.arange(0,40) 
colNewList = ['col'+str(x) for x in numberList]

print(colNewList)

iterCount = 0

for colName in colNewList:

    #copy column A in to the new column
    df4 = df4.withColumn(colName,df4.A)

    if (np.mod(iterCount,10) == 0):           
        df4 = df4.persist(StorageLevel.MEMORY_AND_DISK)      

        df4.checkpoint(eager=True)

        df4.count()    
        #checking if underlying RDD is being checkpointed        
        print("is data frame checkpointed "+str(df4.rdd.isCheckpointed()))

    iterCount +=1

It is unclear why df4.rdd.isCheckpointed() is returning False each time, when I can see that the checkpoint folder is being populated. Any thoughts?

回答1:

The checkpoint method returns a new check-pointed Dataset, it does not modify the current Dataset.

Change

df4.checkpoint(eager=True)

To

df4 = df4.checkpoint(eager=True)