I would like to upgrade my AWS data pipeline definition to EMR 4.x or 5.x, so I can take advantage of Hive's latest features (version 2.0+), such as CURRENT_DATE
and CURRENT_TIMESTAMP
, etc.
The change from EMR 3.x to 4.x/5.x requires the use of releaseLabel
in EmrCluster
, versus amiVersion
.
When I use a "releaseLabel": "emr-4.1.0"
, I get the following error: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.tez.TezTask
Below is my data pipeline definition, for EMR 3.x. It works well, so I hope others find this useful (including the answer for emr 4.x/5.x), as the common answer/recommendation to importing data into DynamoDB from a file is to use Data Pipeline, but literally no one has put forward a solid & simple working example (say for custom data format).
{
"objects": [
{
"type": "DynamoDBDataNode",
"id": "DynamoDBDataNode1",
"name": "OutputDynamoDBTable",
"dataFormat": {
"ref": "DynamoDBDataFormat1"
},
"region": "us-east-1",
"tableName": "testImport"
},
{
"type": "Custom",
"id": "Custom1",
"name": "InputCustomFormat",
"column": [
"firstName", "lastName"
],
"columnSeparator" : "|",
"recordSeparator" : "\n"
},
{
"type": "S3DataNode",
"id": "S3DataNode1",
"name": "InputS3Data",
"directoryPath": "s3://data.domain.com",
"dataFormat": {
"ref": "Custom1"
}
},
{
"id": "Default",
"name": "Default",
"scheduleType": "ondemand",
"failureAndRerunMode": "CASCADE",
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "s3://logs.data.domain.com"
},
{
"type": "HiveActivity",
"id": "HiveActivity1",
"name": "S3ToDynamoDBImportActivity",
"output": {
"ref": "DynamoDBDataNode1"
},
"input": {
"ref": "S3DataNode1"
},
"hiveScript": "INSERT OVERWRITE TABLE ${output1} SELECT reflect('java.util.UUID', 'randomUUID') as uuid, TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP())) as loadDate, firstName, lastName FROM ${input1};",
"runsOn": {
"ref": "EmrCluster1"
}
},
{
"type": "EmrCluster",
"name": "EmrClusterForImport",
"id": "EmrCluster1",
"coreInstanceType": "m1.medium",
"coreInstanceCount": "1",
"masterInstanceType": "m1.medium",
"amiVersion": "3.11.0",
"region": "us-east-1",
"terminateAfter": "1 Hours"
},
{
"type": "DynamoDBDataFormat",
"id": "DynamoDBDataFormat1",
"name": "OutputDynamoDBDataFormat",
"column": [
"uuid", "loadDate", "firstName", "lastName"
]
}
],
"parameters": []
}
A sample file could look like
John|Doe
Jane|Doe
Carl|Doe
Bonus: rather than setting CURRENT_DATE
in a column, how I can set as a variable in the hiveScript section? I tried SET loadDate = CURRENT_DATE;\n\n INSERT OVERWRITE..."
to no avail. Not shown in my example are other dynamic fields I would like to set before the query clause.