Unable to run scripts properly in AWS Glue PySpark

2019-07-03 16:15发布

问题:

I've configured an AWS Glue dev endpoint and can connect to it successfully in a pyspark REPL shell - like this https://docs.aws.amazon.com/glue/latest/dg/dev-endpoint-tutorial-repl.html

Unlike the example given in the AWS documentation I receive WARNings when I begin the session, and later on various operations on AWS Glue DynamicFrame structures fail. Here's the full log on starting the session - note the errors about spark.yarn.jars and PyGlue.zip:

Python 2.7.12 (default, Sep  1 2016, 22:14:00)
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/share/aws/glue/etl/jars/glue-assembly.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
18/03/02 14:18:58 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
18/03/02 14:19:03 WARN Client: Same path resource file:/usr/share/aws/glue/etl/python/PyGlue.zip added multiple times to distributed cache.
18/03/02 14:19:13 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.12 (default, Sep  1 2016 22:14:00)
SparkSession available as 'spark'.
>>>

Many operations work as I expect but I also receive some unwelcome exceptions, for example I can load data from my Glue catalog inspect its structure and the data within, but I can't apply a Map to it, or convert it to a DF. Here's my full execution run log (apart from the longest error message). The first few commands and setup all work well, but the final two operations fail:

>>> import sys
>>> from awsglue.transforms import *
>>> from awsglue.utils import getResolvedOptions
>>> from pyspark.context import SparkContext
>>> from awsglue.context import GlueContext
>>> from awsglue.job import Job
>>>
>>> glueContext = GlueContext(spark)
>>> # Receives a string of the format yyyy-mm-dd hh:mi:ss.nnn and returns the first 10 characters: yyyy-mm-dd
... def TruncateTimestampString(ts):
...   ts = ts[:10]
...   return ts
...
>>> TruncateTimestampString('2017-03-05 06:12:08.376')
'2017-03-05'
>>>
>>> # Given a record with a timestamp property returns a record with a new property, day, containing just the date portion of the timestamp string, expected to be yyyy-mm-dd.
... def TruncateTimestamp(rec):
...   rec[day] = TruncateTimestampString(rec[timestamp])
...   return rec
...
>>> # Get the history datasource - WORKS WELL BUT LOGS log4j2 ERROR
>>> datasource_history_1 = glueContext.create_dynamic_frame.from_catalog(database = "dev", table_name = "history", transformation_ctx = "datasource_history_1")
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
>>> # Tidy the history datasource - WORKS WELL
>>> history_tidied = datasource_history_1.drop_fields(['etag', 'jobmaxid', 'jobminid', 'filename']).rename_field('id', 'history_id')
>>> history_tidied.printSchema()
root
|-- jobid: string
|-- spiderid: long
|-- timestamp: string
|-- history_id: long

>>> # Trivial observation of the SparkSession objects
>>> SparkSession
<class 'pyspark.sql.session.SparkSession'>
>>> spark
<pyspark.sql.session.SparkSession object at 0x7f8668f3b650>
>>> 
>>> 
>>> # Apply a mapping to the tidied history datasource. FAILS
>>> history_mapped = history_tidied.map(TruncateTimestamp)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/mnt/tmp/spark-1f0341db-5de6-4008-974f-a1d194524a86/userFiles-6a67bdee-7c44-46d6-a0dc-9daa7177e7e2/PyGlue.zip/awsglue/dynamicframe.py", line 101, in map
File "/mnt/tmp/spark-1f0341db-5de6-4008-974f-a1d194524a86/userFiles-6a67bdee-7c44-46d6-a0dc-9daa7177e7e2/PyGlue.zip/awsglue/dynamicframe.py", line 105, in mapPartitionsWithIndex
File "/usr/lib/spark/python/pyspark/rdd.py", line 2419, in __init__
    self._jrdd_deserializer = self.ctx.serializer
AttributeError: 'SparkSession' object has no attribute 'serializer'
>>> history_tidied.toDF()
ERROR
Huge error log and stack trace follows, longer than my console can remember. Here's how it finishes:
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/mnt/tmp/spark-1f0341db-5de6-4008-974f-a1d194524a86/userFiles-6a67bdee-7c44-46d6-a0dc-9daa7177e7e2/PyGlue.zip/awsglue/dynamicframe.py", line 128, in toDF
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 79, in deco
    raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: u"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState':"

I think I'm following the instructions given by Amazon in their Dev Endpoint REPL instructions, but with these fairly simple operations (DynamicFrame.join and DynamicFrame.toDF) failing I'm working in the dark when I want to run the job for real (which seems to succeed, but my DynamicFrame.printSchema() and DynamicFrame.show() commands don't show up in the CloudWatch logs for the execution).

Does anyone know what do I need to do to fix my REPL environment so that I can properly test pyspark AWS Glue scripts?

回答1:

Is the development endpoint configured on Windows or an Unix environment, looks like the downloaded files are not copied in the right location of linux, or code snippet is unable to find the path for the jars. Please ensure the files are there.

I have setup a Zeppelin notebook on my windows machine, and am able to connect to glue catalog successfully. Maybe you can try that as well, let me know if you need any help.



回答2:

AWS Support finally responded to my query on this issue. Here's the response:

On researching further, I found that this is a known issue with the PySpark shell and glue service team is already on it. The fix should be deployed soon, however currently there's no ETA that I can share with you.

Meanwhile here's a workaround: before initializing Glue context, you can do

>> newconf = sc._conf.set("spark.sql.catalogImplementation", "in-memory")
>> sc.stop()
>> sc = sc.getOrCreate(newconf)

and then instantiate glueContext from that sc.

I can confirm this works for me, here's a script I was able to run:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# New recommendation from AWS Support 2018-03-22
newconf = sc._conf.set("spark.sql.catalogImplementation", "in-memory")
sc.stop()
sc = sc.getOrCreate(newconf)
# End AWS Support Workaround

glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

datasource_history_1 = glueContext.create_dynamic_frame.from_catalog(database = "dev", table_name = "history", transformation_ctx = "datasource_history_1")

def DoNothingMap(rec):
  return rec

history_mapped = datasource_history_1.map(DoNothingMap)
history_df = history_mapped.toDF()
history_df.show()
history_df.printSchema()

Previously the .map() and .toDF() calls would fail.

I've asked AWS Support to notify me when this issue has been resolved so that the workaround is no longer required.