Avoiding Data Duplication when Loading Data from M

2019-08-08 02:36发布

I have a dozen web servers each writing data to a log file. At the beginning of each hour, the data from the previous hour is loaded to hive using a cron script running the command:

hive -e "LOAD DATA LOCAL INPATH 'myfile.log' INTO TABLE my_table PARTITION(dt='2015-08-17-05')"

In some cases, the command fails and exits with a code other than 0, in which case our script awaits and tries again. The problem is, in some cases of failure, the data loading does not fail, even though it shows a failure message. How can I know for sure whether or not the data has been loaded?

Example for such a "failure" where the data is loaded:

Loading data to table default.my_table partition (dt=2015-08-17-05) Failed with exception org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter partition. FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.MoveTask

Edit: Alternatively, is there a way to query hive for the filenames loaded into it? I can use DESCRIBE to see the number of files. Can I know their names?

标签: hadoop hive
2条回答
Evening l夕情丶
2楼-- · 2019-08-08 02:50

About "which files have been loaded in a partition":

  • if you had used an EXTERNAL TABLE and just uploaded your raw data file in the HDFS directory mapped to LOCATION, then you could

(a) just run a hdfs dfs -ls on that directory from command line (or use the equivalent Java API call) (b) run a Hive query such as select distinct INPUT__FILE__NAME from (...)

  • but in your case, you copy the data into a "managed" table, so there is no way to retrieve the data lineage (i.e. which log file was used to create each managed datafile)
  • ...unless you add explicitly the original file name inside the log file, of course (either on "special" header record, or at the beginning of each record - which can be done with good old sed)

About "how to automagically avoid duplication on INSERT": there is a way, but it would require quite a bit of re-engineering, and would cost you in terms of processing time /(extra Map step plus MapJoin)/...

  1. map your log file to an EXTERNAL TABLE so that you can run an INSERT-SELECT query
  2. upload the original file name into your managed table using INPUT__FILE__NAME pseudo-column as source
  3. add a WHERE NOT EXISTS clause w/ correlated sub-query, so that if the source file name is already present in target then you load nothing more

    INSERT INTO TABLE Target SELECT ColA, ColB, ColC, INPUT__FILE__NAME AS SrcFileName FROM Source src WHERE NOT EXISTS (SELECT DISTINCT 1 FROM Target trg WHERE trg.SrcFileName =src.INPUT__FILE__NAME )

    Note the silly DISTINCT that is actually required to avoid blowing away the RAM in your Mappers; it would be useless with a mature DBMS like Oracle, but the Hive optimizer is still rather crude...

查看更多
【Aperson】
3楼-- · 2019-08-08 03:12

I don't believe you can simply do this is in Hadoop/Hive. So here are the basics of an implementation in python:

import subprocess
x=subprocess.check_output([hive -e "select count(*) from my_table where dt='2015-08-17-05'"])
print type(x)
print x

But you have to spend some time working with backslashes to get hive -e to work using python. It can be very difficult. It may be easier to write a file with that simple query in it first, and then use hive -f filename. Then, print the output of subprocess.check_output in order to see how the output is stored. You may need to do some regex or type conversions, but I think it should just come back as a string. Then simply use an if statement:

if x > 0:
    pass
else:
    hive -e "LOAD DATA LOCAL INPATH 'myfile.log' INTO TABLE my_table PARTITION(dt='2015-08-17-05')"
查看更多
登录 后发表回答