Firehose JSON -> S3 Parquet -> ETL Spark, error: U

2019-07-08 10:31发布

问题:

It seems like this should be easy, like it's a core use case of this set of features, but it's been problem after problem.

The latest is in trying to run commands via a Glue Dev endpoint (both the PySpark and Scala end-points).

Following the instructions here: https://docs.aws.amazon.com/glue/latest/dg/dev-endpoint-tutorial-repl.html

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
glueContext = GlueContext(SparkContext.getOrCreate())
df = glueContext.create_dynamic_frame.from_catalog(database="mydb", table_name="mytable")

generates this error:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/mnt/tmp/spark-0ba544c9-0b5a-42bb-8a54-9b3138205179/userFiles-95708c09-59f6-4b6d-9016-d0375d511b7a/PyGlue.zip/awsglue/dynamicframe.py", line 557, in from_catalog
  File "/mnt/tmp/spark-0ba544c9-0b5a-42bb-8a54-9b3138205179/userFiles-95708c09-59f6-4b6d-9016-d0375d511b7a/PyGlue.zip/awsglue/context.py", line 136, in create_dynamic_frame_from_catalog
  File "/mnt/tmp/spark-0ba544c9-0b5a-42bb-8a54-9b3138205179/userFiles-95708c09-59f6-4b6d-9016-d0375d511b7a/PyGlue.zip/awsglue/data_source.py", line 36, in getFrame
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Unable to infer schema for Parquet. It must be specified manually.;'

It also generates this warning, in one of the setup lines:

18/06/26 19:09:35 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.

The overall setup it pretty simple: we have an incoming Kinesis data stream, a processor for that stream that produces a JSON Kinesis data stream, a Kinesis firehose stream configured to write that JSON stream to Parquet files in S3, and then the required Glue catalogue configurations to make that happen.

Athena can see the data just fine, but the Scala/PySpark scripts error out.

Any ideas / suggestions?

回答1:

Okay, still not clear why this was happening, but, found a fix!

Basically, instead of using the generated code:

val datasource0 = glueContext.getCatalogSource(
        database = "db",
        tableName = "myTable",
        redshiftTmpDir = "",
        transformationContext = "datasource0"
    ).getDynamicFrame()

use this code:

val crawledData = glueContext.getSourceWithFormat(
        connectionType = "s3",
        options = JsonOptions(s"""{"path": "s3://mybucket/mytable/*/*/*/*/"}"""),
        format = "parquet",
        transformationContext = "source"
    ).getDynamicFrame()

The key bit here seemed to be the */*/*/*/ - if I just specified the root folder, I would get the Parquet error, and (apparently) the normal /**/* wildcard wouldn't work.