sparksql.sql.codegen is not giving any improvement

2019-05-21 11:39发布

问题:

I am executing a query in spark sql like below. The data of the tables is stored in 2 different nodes in hive tables.

But because the query is a bit slow I try to find some options in spark so the query can execute faster. So I found that we can configure sparksql.sql.codegen and spark.sql.inMemoryColumnarStorage.compressed to true instead of the default false.

But I'm not having any improvement, the query with this two options at true is taking 4,1 minutes to execute. With this options at false is taking also 4,1 minutes.

Do you understand why this options arent working?

   query = hiveContext.sql("""select
        l_returnflag,
        l_linestatus,
        sum(l_quantity) as sum_qty,
        sum(l_extendedprice) as sum_base_price,
        sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
        sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
        avg(l_quantity) as avg_qty,
        avg(l_extendedprice) as avg_price,
        avg(l_discount) as avg_disc,
        count(*) as count_order
    from
        lineitem
    where
        l_shipdate <= '1998-09-16'
    group by
        l_returnflag,
        l_linestatus
    order by
        l_returnflag,
        l_linestatus""");

query.collect();

回答1:

  • spark.sql.codegen.wholeStage is enabled by default for spark 2.0. and it will do all the internal optimization possible from the spark catalist side .

  • spark.sql.codegen (which is feature in Spark 1.3+) is by default false. Even if you make as true, you can cross-check with DF.explain / debug

However, pls. Re-visit the approach which was explained in spark 2+ as below.

If you are using lower version of spark i.e 1.3 or 1.4+ the same DataFrame approach is valid except we have to use with hiveContext.


  • With my experience, Dataset[Row] aka DataFrame approach of above query is bit faster than plain hive query.

please try the below pseudo code.

create a data frame with out any aggregation, group, order by like this.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import spark.implicits._
import spark.sql


// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"

val spark = SparkSession
  .builder()
  .appName("Spark Hive Aggregations")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()


val df : DataFrame = sql(("""select l_returnflag, l_linestatus,l_quantity,l_extendedprice,l_quantity ,l_extendedprice,l_quantity, l_extendedprice, l_discount from
        lineitem where l_shipdate <= '1998-09-16""");

// can use spark udf or when(cond, evaluation), instead of direct expression
 val df1 =  df.withColumn("sum_disc_price", df.col("l_extendedprice") * (1 - df.col("l_discount"))
          .withColumn("sum_charge", df.col("l_extendedprice") * (1 + df.col("l_tax"))

//NOW SUM, AVG and group by  on dataframe
val groupeddf = df1.groupBy(
  df1.col("returnflag")
, df1.col("l_linestatus")
.agg(
      avg(df1.col("l_quantity")),
    , avg(df1.col("l_extendedprice"))
    , avg(df1.col("l_discount"))
    , sum(df1.col("l_quantity"))
    , sum(df1.col("l_extendedprice"))
    , sum(df1.col("sum_disc_price"))
    , sum(df1.col("sum_charge"))
    , count(df1.col("l_linestatus").as("cnt")
    ) //end agg
    ) //end group by 
//order by on dataframe  
.orderBy("l_returnflag"))
.sort("l_linestatus")
val finalDF = groupeddf.select("l_returnflag","l_linestatus",............. etc);
  • Also , parameters like executor memory, number of executors/ cores etc also needs to be considered to find exact issue