为什么SparkSession一个动作执行两次?为什么SparkSession一个动作执行两次?(W

2019-05-12 08:40发布

最近升级到2.0星火,并试图创建JSON字符串一个简单的数据集时,我看到一些奇怪的行为。 这里有一个简单的测试案例:

 SparkSession spark = SparkSession.builder().appName("test").master("local[1]").getOrCreate();
 JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

 JavaRDD<String> rdd = sc.parallelize(Arrays.asList(
            "{\"name\":\"tom\",\"title\":\"engineer\",\"roles\":[\"designer\",\"developer\"]}",
            "{\"name\":\"jack\",\"title\":\"cto\",\"roles\":[\"designer\",\"manager\"]}"
         ));

 JavaRDD<String> mappedRdd = rdd.map(json -> {
     System.out.println("mapping json: " + json);
     return json;
 });

 Dataset<Row> data = spark.read().json(mappedRdd);
 data.show();

和输出:

mapping json: {"name":"tom","title":"engineer","roles":["designer","developer"]}
mapping json: {"name":"jack","title":"cto","roles":["designer","manager"]}
mapping json: {"name":"tom","title":"engineer","roles":["designer","developer"]}
mapping json: {"name":"jack","title":"cto","roles":["designer","manager"]}
+----+--------------------+--------+
|name|               roles|   title|
+----+--------------------+--------+
| tom|[designer, develo...|engineer|
|jack| [designer, manager]|     cto|
+----+--------------------+--------+

看来,正在执行的“地图”功能的两倍,即使我只执行一个动作。 我认为,星火会懒洋洋地建立一个执行计划,需要的时候再执行它,但是这使得它看起来是为了读取数据,JSON和用它做任何事情,该计划将不得不至少两次执行。

在这个简单的例子没关系,但当地图功能是长时间运行,这将成为一个大问题。 这是正确的,还是我失去了一些东西?

Answer 1:

这是因为您没有提供架构DataFrameReader 。 其结果是火花必须热切扫描数据集来推断输出模式。

由于mappedRdd是不缓存它会被两次评估:

  • 一次架构推断
  • 一旦当你调用data.show

如果你想阻止你应该为读者(Scala的语法)模式:

val schema: org.apache.spark.sql.types.StructType = ???
spark.read.schema(schema).json(mappedRdd)


文章来源: Why does SparkSession execute twice for one action?