Why hive_staging file is missing in AWS EMR

2019-01-27 20:07发布

问题:

Problem -

I am running 1 query in AWS EMR. It is failing by throwing exception -

java.io.FileNotFoundException: File s3://xxx/yyy/internal_test_automation/2016/09/17/17156/data/feed/commerce_feed_redshift_dedup/.hive-staging_hive_2016-09-17_10-24-20_998_2833938482542362802-639 does not exist.

I mentioned all the related information for this problem below. Please check.

Query -

INSERT OVERWRITE TABLE base_performance_order_dedup_20160917
SELECT 
*
 FROM 
(
select
commerce_feed_redshift_dedup.sku AS sku,
commerce_feed_redshift_dedup.revenue AS revenue,
commerce_feed_redshift_dedup.orders AS orders,
commerce_feed_redshift_dedup.units AS units,
commerce_feed_redshift_dedup.feed_date AS feed_date
from commerce_feed_redshift_dedup
) tb

Exception -

ERROR Error while executing queries
java.sql.SQLException: Error while processing statement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask. Vertex failed, vertexName=Map 1, vertexId=vertex_1474097800415_0311_2_00, diagnostics=[Vertex vertex_1474097800415_0311_2_00 [Map 1] killed/failed due to:ROOT_INPUT_INIT_FAILURE, Vertex Input: commerce_feed_redshift_dedup initializer failed, vertex=vertex_1474097800415_0311_2_00 [Map 1], java.io.FileNotFoundException: File s3://xxx/yyy/internal_test_automation/2016/09/17/17156/data/feed/commerce_feed_redshift_dedup/.hive-staging_hive_2016-09-17_10-24-20_998_2833938482542362802-639 does not exist.
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:987)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:929)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.listStatus(EmrFileSystem.java:339)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1530)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1537)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1556)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1601)
    at org.apache.hadoop.fs.FileSystem$4.(FileSystem.java:1778)
    at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:1777)
    at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:1755)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:239)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:201)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:281)
    at org.apache.hadoop.hive.ql.io.HiveInputFormat.addSplitsForGroup(HiveInputFormat.java:363)
    at org.apache.hadoop.hive.ql.io.HiveInputFormat.getSplits(HiveInputFormat.java:486)
    at org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator.initialize(HiveSplitGenerator.java:200)
    at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:278)
    at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable$1.run(RootInputInitializerManager.java:269)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:269)
    at org.apache.tez.dag.app.dag.RootInputInitializerManager$InputInitializerCallable.call(RootInputInitializerManager.java:253)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
]Vertex killed, vertexName=Reducer 2, vertexId=vertex_1474097800415_0311_2_01, diagnostics=[Vertex received Kill in INITED state., Vertex vertex_1474097800415_0311_2_01 [Reducer 2] killed/failed due to:OTHER_VERTEX_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:1
    at org.apache.hive.jdbc.HiveStatement.waitForOperationToComplete(HiveStatement.java:348)
    at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:251)
    at com.XXX.YYY.executors.HiveQueryExecutor.executeQueriesInternal(HiveQueryExecutor.java:234)
    at com.XXX.YYY.executors.HiveQueryExecutor.executeQueriesMetricsEnabled(HiveQueryExecutor.java:184)
    at com.XXX.YYY.azkaban.jobexecutors.impl.AzkabanHiveQueryExecutor.run(AzkabanHiveQueryExecutor.java:68)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at azkaban.jobtype.JavaJobRunnerMain.runMethod(JavaJobRunnerMain.java:192)
    at azkaban.jobtype.JavaJobRunnerMain.(JavaJobRunnerMain.java:132)
    at azkaban.jobtype.JavaJobRunnerMain.main(JavaJobRunnerMain.java:76)

Hive Configuration properties, that I set before executing above query. -

set hivevar:hive.mapjoin.smalltable.filesize=2000000000
set hivevar:mapreduce.map.speculative=false
set hivevar:mapreduce.output.fileoutputformat.compress=true
set hivevar:hive.exec.compress.output=true
set hivevar:mapreduce.task.timeout=6000000
set hivevar:hive.optimize.bucketmapjoin.sortedmerge=true
set hivevar:io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec
set hivevar:hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat
set hivevar:hive.auto.convert.sortmerge.join.noconditionaltask=false
set hivevar:FEED_DATE=20160917
set hivevar:hive.optimize.bucketmapjoin=true
set hivevar:hive.exec.compress.intermediate=true
set hivevar:hive.enforce.bucketmapjoin=true
set hivevar:mapred.output.compress=true
set hivevar:mapreduce.map.output.compress=true
set hivevar:hive.auto.convert.sortmerge.join=false
set hivevar:hive.auto.convert.join=false
set hivevar:mapreduce.reduce.speculative=false
set hivevar:PD_KEY=vijay-test-mail@XXX.pagerduty.com
set hivevar:mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
set hive.mapjoin.smalltable.filesize=2000000000
set mapreduce.map.speculative=false
set mapreduce.output.fileoutputformat.compress=true
set hive.exec.compress.output=true
set mapreduce.task.timeout=6000000
set hive.optimize.bucketmapjoin.sortedmerge=true
set io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat
set hive.auto.convert.sortmerge.join.noconditionaltask=false
set FEED_DATE=20160917
set hive.optimize.bucketmapjoin=true
set hive.exec.compress.intermediate=true
set hive.enforce.bucketmapjoin=true 
set mapred.output.compress=true 
set mapreduce.map.output.compress=true 
set hive.auto.convert.sortmerge.join=false 
set hive.auto.convert.join=false 
set mapreduce.reduce.speculative=false 
set PD_KEY=vijay-test-mail@XXX.pagerduty.com 
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec

/etc/hive/conf/hive-site.xml

<configuration>

<!-- Hive Configuration can either be stored in this file or in the hadoop configuration files  -->
<!-- that are implied by Hadoop setup variables.                                                -->
<!-- Aside from Hadoop setup variables - this file is provided as a convenience so that Hive    -->
<!-- users do not have to edit hadoop configuration files (that may be managed as a centralized -->
<!-- resource).                                                                                 -->

<!-- Hive Execution Parameters -->


<property>
  <name>hbase.zookeeper.quorum</name>
  <value>ip-172-30-2-16.us-west-2.compute.internal</value>
  <description>http://wiki.apache.org/hadoop/Hive/HBaseIntegration</description>
</property>

<property>
  <name>hive.execution.engine</name>
  <value>tez</value>
</property>

  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://ip-172-30-2-16.us-west-2.compute.internal:8020</value>
  </property>


  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://ip-172-30-2-16.us-west-2.compute.internal:9083</value>
    <description>JDBC connect string for a JDBC metastore</description>
  </property>

  <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://ip-172-30-2-16.us-west-2.compute.internal:3306/hive?createDatabaseIfNotExist=true</value>
    <description>username to use against metastore database</description>
  </property>

  <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>org.mariadb.jdbc.Driver</value>
    <description>username to use against metastore database</description>
  </property>

  <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>hive</value>
    <description>username to use against metastore database</description>
  </property>

  <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>mrN949zY9P2riCeY</value>
    <description>password to use against metastore database</description>
  </property>

  <property>
    <name>datanucleus.fixedDatastore</name>
    <value>true</value>
  </property>

  <property>
    <name>mapred.reduce.tasks</name>
    <value>-1</value>
  </property>

  <property>
    <name>mapred.max.split.size</name>
    <value>256000000</value>
  </property>

  <property>
    <name>hive.metastore.connect.retries</name>
    <value>15</value>
  </property>

  <property>
    <name>hive.optimize.sort.dynamic.partition</name>
    <value>true</value>
  </property>

  <property>
    <name>hive.async.log.enabled</name>
    <value>false</value>
  </property>

</configuration>

/etc/tez/conf/tez-site.xml

<configuration>
    <property>
    <name>tez.lib.uris</name>
    <value>hdfs:///apps/tez/tez.tar.gz</value>
  </property>

  <property>
    <name>tez.use.cluster.hadoop-libs</name>
    <value>true</value>
  </property>

  <property>
    <name>tez.am.grouping.max-size</name>
    <value>134217728</value>
  </property>

  <property>
    <name>tez.runtime.intermediate-output.should-compress</name>
    <value>true</value>
  </property>

  <property>
    <name>tez.runtime.intermediate-input.is-compressed</name>
    <value>true</value>
  </property>

  <property>
    <name>tez.runtime.intermediate-output.compress.codec</name>
    <value>org.apache.hadoop.io.compress.LzoCodec</value>
  </property>

  <property>
    <name>tez.runtime.intermediate-input.compress.codec</name>
    <value>org.apache.hadoop.io.compress.LzoCodec</value>
  </property>

  <property>
    <name>tez.history.logging.service.class</name>
    <value>org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService</value>
  </property>

  <property>
    <name>tez.tez-ui.history-url.base</name>
    <value>http://ip-172-30-2-16.us-west-2.compute.internal:8080/tez-ui/</value>
  </property>
</configuration>

Questions -

  1. Which process deleted this file ? For hive, this file should be there only. (Also, this file is not created by application code.)
  2. When I ran failed query numbers of times, it passes. Why there is ambiguous behaviour ?
  3. Since, I just upgraded hive-exec, hive-jdbc version to 2.1.0. So, it seems like some hive configuration properties wrongly set or some properties are missing. Can you help me in finding wrongly set/missed hive properties ?

Note - I upgraded hive-exec version from 0.13.0 to 2.1.0. In previous version, all queries are working fine.

Update-1

When I launch another cluster, it worked fine. I tested 3 times on the same ETL.

When I did the same thing again on new cluster, it is showing the same exception. Not able to understand, why this ambiguity is happening.

Help me to understand this ambiguity.

I am naive in dealing with Hive. So, have less conceptual idea about this.

Update-2-

hfs logs under Cluster Public DNS Name:50070 -

2016-09-20 11:31:55,155 WARN org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy (IPC Server handler 11 on 8020): Failed to place enough replicas, still in need of 1 to reach 1 (unavailableStorages=[], storagePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}, newBlock=true) For more information, please enable DEBUG log level on org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy 2016-09-20 11:31:55,155 WARN org.apache.hadoop.hdfs.protocol.BlockStoragePolicy (IPC Server handler 11 on 8020): Failed to place enough replicas: expected size is 1 but only 0 storage types can be selected (replication=1, selected=[], unavailable=[DISK], removed=[DISK], policy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}) 2016-09-20 11:31:55,155 WARN org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy (IPC Server handler 11 on 8020): Failed to place enough replicas, still in need of 1 to reach 1 (unavailableStorages=[DISK], storagePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]}, newBlock=true) All required storage types are unavailable: unavailableStorages=[DISK], storagePolicy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]} 2016-09-20 11:31:55,155 INFO org.apache.hadoop.ipc.Server (IPC Server handler 11 on 8020): IPC Server handler 11 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock from 172.30.2.207:56462 Call#7497 Retry#0 java.io.IOException: File /user/hive/warehouse/bc_kmart_3813.db/dp_internal_temp_full_load_offer_flexibility_20160920/.hive-staging_hive_2016-09-20_11-17-51_558_1222354063413369813-58/_task_tmp.-ext-10000/_tmp.000079_0 could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and no node(s) are excluded in this operation. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1547) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3107) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:724) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

When I searched this exception. I found this page - https://wiki.apache.org/hadoop/CouldOnlyBeReplicatedTo

In my cluster, there is one data node with 32 GB disk space.

** /etc/hive/conf/hive-default.xml.template - **

<property>
    <name>hive.exec.stagingdir</name>
    <value>.hive-staging</value>
    <description>Directory name that will be created inside table locations in order to support HDFS encryption. This is replaces ${hive.exec.scratchdir} for query results with the exception of read-only tables. In all cases ${hive.exec.scratchdir} is still used for other temporary files, such as job plans.</description>
  </property>

Questions-

  1. As per logs, hive-staging folder is created in cluster machine, as per /var/log/hadoop-hdfs/hadoop-hdfs-datanode-ip-172-30-2-189.log, then why it is creating same folder in s3 also ?

Update-3-

Some exceptions are of type - LeaseExpiredException -

2016-09-21 08:53:17,995 INFO org.apache.hadoop.ipc.Server (IPC Server handler 13 on 8020): IPC Server handler 13 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.complete from 172.30.2.189:42958 Call#726 Retry#0: org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException: No lease on /tmp/hive/hadoop/_tez_session_dir/6ebd2d18-f5b9-4176-ab8f-d6c78124b636/.tez/application_1474442135017_0022/recovery/1/summary (inode 20326): File does not exist. Holder DFSClient_NONMAPREDUCE_1375788009_1 does not have any open files.

回答1:

I resolved the issue. Let me explain in detail.

Exceptions that is coming -

  1. LeaveExpirtedException - from HDFS side.
  2. FileNotFoundException - from Hive side (when Tez execution engine executes DAG)

Problem scenario-

  1. We just upgraded the hive version from 0.13.0 to 2.1.0. And, everything was working fine with previous version. Zero runtime exception.

Different thoughts to resolve the issue -

  1. First thought was, two threads was working on same piece because of NN intelligence. But as per below settings

    set mapreduce.map.speculative=false set mapreduce.reduce.speculative=false

that was not possible.

  1. then, I increase the count from 1000 to 100000 for below settings -

    SET hive.exec.max.dynamic.partitions=100000; SET hive.exec.max.dynamic.partitions.pernode=100000;

that also didn't work.

  1. Then the third thought was, definitely in a same process, what mapper-1 was created was deleted by another mapper/reducer. But, we didn't found any such logs in Hveserver2, Tez logs.

  2. Finally the root cause lies in a application layer code itself. In hive-exec-2.1.0 version, they introduced new configuration property

    "hive.exec.stagingdir":".hive-staging"

Description of above property -

Directory name that will be created inside table locations in order to support HDFS encryption. This is replaces ${hive.exec.scratchdir} for query results with the exception of read-only tables. In all cases ${hive.exec.scratchdir} is still used for other temporary files, such as job plans.

So if there is any concurrent jobs in Application layer code (ETL), and are doing operation(rename/delete/move) on same table, then it may lead to this problem.

And, in our case, 2 concurrent jobs are doing "INSERT OVERWRITE" on same table, that leads to delete metadata file of 1 mapper, that is causing this issue.

Resolution -

  1. Move the metadata file location to outside table(table lies in S3).
  2. Disable HDFS encryption (as mentioned in Description of stagingdir property.)
  3. Change into your Application layer code to avoid concurrency issue.