Parquet Data timestamp columns INT96 not yet imple

2020-04-16 01:27发布

问题:

Context:

I am able to submit a MapReduce job from druid overlord to an EMR. My Data source is in S3 in Parquet format. I have a timestamp column (INT96) in parquet data which is not supported in Avroschema.

Error is while parsing the timestamp

Issue Stack trace is:

Error: java.lang.IllegalArgumentException: INT96 not yet implemented.
at org.apache.parquet.avro.AvroSchemaConverter$1.convertINT96(AvroSchemaConverter.java:279)
at org.apache.parquet.avro.AvroSchemaConverter$1.convertINT96(AvroSchemaConverter.java:264)
at org.apache.parquet.schema.PrimitiveType$PrimitiveTypeName$7.convert(PrimitiveType.java:223)

Environment:

Druid version: 0.11
EMR version : emr-5.11.0
Hadoop version: Amazon 2.7.3

Druid input json

{
  "type": "index_hadoop",
  "spec": {
    "ioConfig": {
      "type": "hadoop",
      "inputSpec": {
        "type": "static",
        "inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
        "paths": "s3://s3_path"
      }
    },
    "dataSchema": {
      "dataSource": "parquet_test1",
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "queryGranularity": "ALL",
        "intervals": ["2017-08-01T00:00:00/2017-08-02T00:00:00"]
      },
      "parser": {
        "type": "parquet",
        "parseSpec": {
          "format": "timeAndDims",
          "timestampSpec": {
            "column": "t",
            "format": "yyyy-MM-dd HH:mm:ss:SSS zzz"            
          },
          "dimensionsSpec": {
            "dimensions": [
              "dim1","dim2","dim3"
            ],
            "dimensionExclusions": [],
            "spatialDimensions": []
          }
        }
      },
      "metricsSpec": [{
        "type": "count",
        "name": "count"
      },{
          "type" : "count",
          "name" : "pid",
          "fieldName" : "pid"
        }]
    },
    "tuningConfig": {
      "type": "hadoop",
      "partitionsSpec": {
        "targetPartitionSize": 5000000
      },
      "jobProperties" : {
        "mapreduce.job.user.classpath.first": "true",
        "fs.s3.awsAccessKeyId" : "KEYID",
        "fs.s3.awsSecretAccessKey" : "AccessKey",
        "fs.s3.impl" : "org.apache.hadoop.fs.s3native.NativeS3FileSystem",
        "fs.s3n.awsAccessKeyId" : "KEYID",
        "fs.s3n.awsSecretAccessKey" : "AccessKey",
        "fs.s3n.impl" : "org.apache.hadoop.fs.s3native.NativeS3FileSystem",
        "io.compression.codecs" : "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec"
      },
      "leaveIntermediate": true
    }
  }, "hadoopDependencyCoordinates": ["org.apache.hadoop:hadoop-client:2.7.3", "org.apache.hadoop:hadoop-aws:2.7.3", "com.hadoop.gplcompression:hadoop-lzo:0.4.20"]
}

Possible solution

 1. Save the data in parquet efficiently instead of transforming in Avro to remove the dependencies.

 2. Fixing AvroSchema to support INT96 timestamp format of Parquet.