Parquet Data timestamp columns INT96 not yet imple

2020-04-16 01:56发布

Context:

I am able to submit a MapReduce job from druid overlord to an EMR. My Data source is in S3 in Parquet format. I have a timestamp column (INT96) in parquet data which is not supported in Avroschema.

Error is while parsing the timestamp

Issue Stack trace is:

Error: java.lang.IllegalArgumentException: INT96 not yet implemented.
at org.apache.parquet.avro.AvroSchemaConverter$1.convertINT96(AvroSchemaConverter.java:279)
at org.apache.parquet.avro.AvroSchemaConverter$1.convertINT96(AvroSchemaConverter.java:264)
at org.apache.parquet.schema.PrimitiveType$PrimitiveTypeName$7.convert(PrimitiveType.java:223)

Environment:

Druid version: 0.11
EMR version : emr-5.11.0
Hadoop version: Amazon 2.7.3

Druid input json

{
  "type": "index_hadoop",
  "spec": {
    "ioConfig": {
      "type": "hadoop",
      "inputSpec": {
        "type": "static",
        "inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
        "paths": "s3://s3_path"
      }
    },
    "dataSchema": {
      "dataSource": "parquet_test1",
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "queryGranularity": "ALL",
        "intervals": ["2017-08-01T00:00:00/2017-08-02T00:00:00"]
      },
      "parser": {
        "type": "parquet",
        "parseSpec": {
          "format": "timeAndDims",
          "timestampSpec": {
            "column": "t",
            "format": "yyyy-MM-dd HH:mm:ss:SSS zzz"            
          },
          "dimensionsSpec": {
            "dimensions": [
              "dim1","dim2","dim3"
            ],
            "dimensionExclusions": [],
            "spatialDimensions": []
          }
        }
      },
      "metricsSpec": [{
        "type": "count",
        "name": "count"
      },{
          "type" : "count",
          "name" : "pid",
          "fieldName" : "pid"
        }]
    },
    "tuningConfig": {
      "type": "hadoop",
      "partitionsSpec": {
        "targetPartitionSize": 5000000
      },
      "jobProperties" : {
        "mapreduce.job.user.classpath.first": "true",
        "fs.s3.awsAccessKeyId" : "KEYID",
        "fs.s3.awsSecretAccessKey" : "AccessKey",
        "fs.s3.impl" : "org.apache.hadoop.fs.s3native.NativeS3FileSystem",
        "fs.s3n.awsAccessKeyId" : "KEYID",
        "fs.s3n.awsSecretAccessKey" : "AccessKey",
        "fs.s3n.impl" : "org.apache.hadoop.fs.s3native.NativeS3FileSystem",
        "io.compression.codecs" : "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec"
      },
      "leaveIntermediate": true
    }
  }, "hadoopDependencyCoordinates": ["org.apache.hadoop:hadoop-client:2.7.3", "org.apache.hadoop:hadoop-aws:2.7.3", "com.hadoop.gplcompression:hadoop-lzo:0.4.20"]
}

Possible solution

 1. Save the data in parquet efficiently instead of transforming in Avro to remove the dependencies.

 2. Fixing AvroSchema to support INT96 timestamp format of Parquet.

0条回答
登录 后发表回答