Sqoop Incremental Import

2020-05-14 04:19发布

问题:

Need advice on Sqoop Incremental Imports. Say I have a Customer with Policy 1 on Day 1 and I imported those records in HDFS on Day 1 and I see them in Part Files.
On Day 2, the same customer adds Policy 2 and after the incremental import sqoop run, will we get only new records in the part files? In that case, How do I get the Old and Incremental appended/last modified records using Sqoop?

回答1:

Consider a table with 3 records which you already imported to hdfs using sqoop

+------+------------+----------+------+------------+
| sid  | city       | state    | rank | rDate      |
+------+------------+----------+------+------------+
|  101 | Chicago    | Illinois |    1 | 2014-01-25 |
|  101 | Schaumburg | Illinois |    3 | 2014-01-25 |
|  101 | Columbus   | Ohio     |    7 | 2014-01-25 |
+------+------------+----------+------+------------+

sqoop import --connect jdbc:mysql://localhost:3306/ydb --table yloc --username root -P

Now you have additional records in the table but no updates on existing records

+------+------------+----------+------+------------+
| sid  | city       | state    | rank | rDate    |
+------+------------+----------+------+------------+
|  101 | Chicago    | Illinois |    1 | 2014-01-25 |
|  101 | Schaumburg | Illinois |    3 | 2014-01-25 |
|  101 | Columbus   | Ohio     |    7 | 2014-01-25 |
|  103 | Charlotte  | NC       |    9 | 2013-04-22 |
|  103 | Greenville | SC       |    9 | 2013-05-12 |
|  103 | Atlanta    | GA       |   11 | 2013-08-21 |
+------+------------+----------+------+------------+

Here you should use an --incremental append with --check-column which specifies the column to be examined when determining which rows to import.

sqoop import --connect jdbc:mysql://localhost:3306/ydb --table yloc --username root -P --check-column rank --incremental append --last-value 7

The above code will insert all the new rows based on the last value.

Now we can think of second case where there are updates in rows

+------+------------+----------+------+------------+
| sid  | city       | state    | rank | rDate      |
+------+------------+----------+------+------------+
|  101 | Chicago    | Illinois |    1 | 2015-01-01 |
|  101 | Schaumburg | Illinois |    3 | 2014-01-25 |
|  101 | Columbus   | Ohio     |    7 | 2014-01-25 |
|  103 | Charlotte  | NC       |    9 | 2013-04-22 |
|  103 | Greenville | SC       |    9 | 2013-05-12 |
|  103 | Atlanta    | GA       |   11 | 2013-08-21 |
|  104 | Dallas     | Texas    |    4 | 2015-02-02 |
|  105 | Phoenix    | Arzona   |   17 | 2015-02-24 |
+------+------------+----------+------+------------+

Here we use incremental lastmodified where we will fetch all the updated rows based on date.

sqoop import --connect jdbc:mysql://localhost:3306/ydb --table yloc --username root -P   --check-column rDate --incremental lastmodified --last-value 2014-01-25 --target-dir yloc/loc


回答2:

In answer to your first question, it depends on how you run the import statement. If you use the --incremental append option, you would be specifying your --check-column and --last-value arguments. These will dictate exactly which records are pulled and they will simply be appended to your table. For example: you could specify a DATE type column for your --check-column argument and a very early date (like '1900-01-01' or Day1 in your case) for --last-value and this would just keep appending everything in the source table (creating duplicate rows) to your destination. In this case, the new part files created will hold both new and old records. You could also use an increasing ID column and keep entering the small ID and that would have the same effect. However, if --last-value is Day2, there will be additional part files with only new records. I'm not sure if you were wondering if you would lose the old records (just in case you were) but that's not the case.

The last-modified argument for --incremental would only be useful if, in the future, you go back and update some of the attributes of an existing row. In this case, it replaces the old data in your table (and adds the new stuff) with the updated version of the row that's now in your source table. Hope this helps!

Oh, all of this is based on The Sqoop User Guide Section 7.2.7 https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html#_incremental_imports

and Chapter 3 of the Apache Sqoop Cookbook (that chapter is actually fantastic!)



回答3:

Step1 : The entire table is imported. This will be available as part-m file in your specified HDFS location (say /user/abc/def/part-m-00000) Step2 : Only the incremental records are imported. This will be available in another location (say /user/abc/def1/part-m-00000)

Now that both the data are available, you can use the sqoop merge option to consolidate both based on the key column.

Refer to the below doc. for more details

https://sqoop.apache.org/docs/1.4.3/SqoopUserGuide.html#_literal_sqoop_merge_literal



回答4:

let's take example here, you are having customer table with two columns cust_id and policy, also custid is your primary key and you just want to insert data cust id 100 onward

scenario 1:- append new data on the basis of cust_id field

phase1:-

below 3 records are there which are inserted recently in customer table which we want to import in HDFS

| custid | Policy |
| 101 | 1 |
| 102 | 2 | 
| 103 | 3 |

here is sqoop command for that

sqoop import \ 
--connect jdbc:mysql://localhost:3306/db \ 
--username root -P \ 
--table customer \ 
--target-dir /user/hive/warehouse/<your db>/<table> \
--append \
--check-column custid \
--incremental append \
--last-value 100

phase2:- below 4 records are there which are inserted recently in customer table which we want to import in HDFS

| custid | Policy |
| 104 | 4 |
| 105 | 5 | 
| 106 | 6 | 
| 107 | 7 | 

here is sqoop command for that

sqoop import \ 
--connect jdbc:mysql://localhost:3306/db \
--username root -P \ 
--table customer \ 
--target-dir /user/hive/warehouse/<your db>/<table> \ 
--append \
--check-column custid \
--incremental append \
--last-value 103

so these four properties we will have to cosider for inserting new records

--append \
--check-column <primary key> \
--incremental append \
--last-value <Last Value of primary key which sqoop job has inserted in last run>

scenario 2:- append new data +update existing data on the basis of cust_id field

below 1 new record with cust id 108 has inserted and cust id 101 and 102 has updated recently in customer table which we want to import in HDFS

| custid | Policy |
| 108 | 8 |
| 101 | 11 | 
| 102 | 12 | 

sqoop import \ 
--connect jdbc:mysql://localhost:3306/db \ 
--username root -P \ 
--table customer \ 
--target-dir /user/hive/warehouse/<your db>/<table> \
--append \
--check-column custid \
--incremental lastmodified \
--last-value 107

so these four properties we will have to cosider for insert/update records in same command

--append \
--check-column <primary key> \
--incremental lastmodified \
--last-value <Last Value of primary key which sqoop job has inserted in last run>

I am specifically mentioning primary key as if table is not having primary key then few more properties needs to be consider which are:-

multiple mapper perform the sqoop job by default so mapper need data to be split on the basis of some key so

either we have to specifically define --m 1 option to say that only one mapper will perform this operation

or we have to specify any other key (by using sqoop property --split-by ) through with you can uniquely identify the data then you can use



回答5:

You could also try a free form query which is going to be altered based on a specific condition. You could write a Java code using Sqoop Client to do the same : How to use Sqoop in Java Program?



回答6:

Here's a step by step guide for Sqoop incremental imports.

For an overview, you use append mode only when the rows in your source table do not update or you don't care about the updates, however you use lastmodified when you want to update the already imported data as well.



回答7:

In such use cases always look for fields which are genuinely incremental in nature for incremental append. and for last modified look best suited field is modified_date or likewise some fields for those which have been changed since you sqoop-ed them. only those and those rows will be updated, adding newer rows in your hdfs location requires incremental append.



回答8:

There are already great responses here. Along with these you could also try Sqoop Query Approach. You can customize your query based on the condition to retrieve the updated records.

STEP 1: Importing New Records from the Database Table:

Example 1:

$ sqoop import \ --query 'SELECT a., b. FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' \ --split-by a.id --target-dir /tmp/MyNewloc

Example 2:

sqoop import --connect "jdbc:jtds:sqlserver://MYPD22:1333;databaseName=myDb"   --target-dir /tmp/MyNewloc --fields-terminated-by \| --username xxx --password='xxx' --query "select * from Policy_Table where Policy_ID > 1 AND \$CONDITIONS"  -m1 

Don't forget to supply $CONDITIONS in the Where Clause.

Please Refer Sqoop Free Form Import

STEP 2: Merging part-m files of both base table (original data) & New Table (New Records)

You could do this using 2 methods.

Method 1 - Using Sqoop Merge

Method 2 - Copying newly generated part-m files into original table target directory. (Copy part-m files from /tmp/MyNewloc to /tmp/MyOriginalLoc/)

STEP 3: CREATING HIVE TABLE

1) Now crate a hive table using Location as original table target directory which contains both original part-m files and new records part-m files.

CREATE  EXTERNAL TABLE IF NOT EXISTS Policy_Table(
Policy_ID string,
Customer_Name string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/tmp/MyOriginalLoc/';


标签: sqoop