How to upgrade Data Pipeline definition from EMR 3

2019-01-26 17:18发布

问题:

I would like to upgrade my AWS data pipeline definition to EMR 4.x or 5.x, so I can take advantage of Hive's latest features (version 2.0+), such as CURRENT_DATE and CURRENT_TIMESTAMP, etc.

The change from EMR 3.x to 4.x/5.x requires the use of releaseLabel in EmrCluster, versus amiVersion.

When I use a "releaseLabel": "emr-4.1.0", I get the following error: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.tez.TezTask

Below is my data pipeline definition, for EMR 3.x. It works well, so I hope others find this useful (including the answer for emr 4.x/5.x), as the common answer/recommendation to importing data into DynamoDB from a file is to use Data Pipeline, but literally no one has put forward a solid & simple working example (say for custom data format).

{
  "objects": [
    {
      "type": "DynamoDBDataNode",
      "id": "DynamoDBDataNode1",
      "name": "OutputDynamoDBTable",
      "dataFormat": {
        "ref": "DynamoDBDataFormat1"
      },
      "region": "us-east-1",
      "tableName": "testImport"
    },
    {
      "type": "Custom",
      "id": "Custom1",
      "name": "InputCustomFormat",
      "column": [
        "firstName", "lastName"
      ],
      "columnSeparator" : "|",
      "recordSeparator" : "\n"
    },
    {
      "type": "S3DataNode",
      "id": "S3DataNode1",
      "name": "InputS3Data",
      "directoryPath": "s3://data.domain.com",
      "dataFormat": {
        "ref": "Custom1"
      }
    },
    {
      "id": "Default",
      "name": "Default",
      "scheduleType": "ondemand",
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "s3://logs.data.domain.com"
    },
    {
      "type": "HiveActivity",
      "id": "HiveActivity1",
      "name": "S3ToDynamoDBImportActivity",
      "output": {
        "ref": "DynamoDBDataNode1"
      },
      "input": {
        "ref": "S3DataNode1"
      },
      "hiveScript": "INSERT OVERWRITE TABLE ${output1} SELECT reflect('java.util.UUID', 'randomUUID') as uuid, TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP())) as loadDate, firstName, lastName FROM ${input1};",
      "runsOn": {
        "ref": "EmrCluster1"
      }
    },
    {
      "type": "EmrCluster",
      "name": "EmrClusterForImport",
      "id": "EmrCluster1",
      "coreInstanceType": "m1.medium",
      "coreInstanceCount": "1",
      "masterInstanceType": "m1.medium",
      "amiVersion": "3.11.0",
      "region": "us-east-1",
      "terminateAfter": "1 Hours"
    },
    {
      "type": "DynamoDBDataFormat",
      "id": "DynamoDBDataFormat1",
      "name": "OutputDynamoDBDataFormat",
      "column": [
        "uuid", "loadDate", "firstName", "lastName"
      ]
    }
  ],
  "parameters": []
}

A sample file could look like

John|Doe
Jane|Doe
Carl|Doe

Bonus: rather than setting CURRENT_DATE in a column, how I can set as a variable in the hiveScript section? I tried SET loadDate = CURRENT_DATE;\n\n INSERT OVERWRITE..." to no avail. Not shown in my example are other dynamic fields I would like to set before the query clause.