Hive Create Multi small files for each insert in H

2020-02-08 09:18发布

following is already been achieved

  1. Kafka Producer pulling data from twitter using Spark Streaming.
  2. Kafka Consumer ingesting data into Hive External table(on HDFS).

while this is working fine so far. there is only one issue I am facing, while my app insert data into Hive table, it created small file with each row data per file.

below is the code

// Define which topics to read from
  val topic = "topic_twitter"
  val groupId = "group-1"
  val consumer = KafkaConsumer(topic, groupId, "localhost:2181")

//Create SparkContext
  val sparkContext = new SparkContext("local[2]", "KafkaConsumer")

//Create HiveContext  
  val hiveContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)

  hiveContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS twitter_data (tweetId BIGINT, tweetText STRING, userName STRING, tweetTimeStamp STRING,   userLang STRING)")
  hiveContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS demo (foo STRING)")

Hive demo table already populated with one single record. Kafka consumer loop thru the data for topic ="topic_twitter" in process each row and populate in Hive table

val hiveSql = "INSERT INTO TABLE twitter_data SELECT STACK( 1," + 
    tweetID        +","  + 
    tweetText      +"," + 
    userName       +"," +
    tweetTimeStamp +","  +
    userLang + ") FROM demo limit 1"

hiveContext.sql(hiveSql)

below are the images from my Hadoop environment. twitter_data, demo Hie Tables in HDFS

last 10 files created in HDFS enter image description here

as you can see the file size is not more than 200KB, is there a way I merge these files in one file?

3条回答
一纸荒年 Trace。
2楼-- · 2020-02-08 09:58

[take 2] OK, so you can't properly "stream" data into Hive. But you can add a periodic compaction post-processing job...

  • create your table with 3 partitions e.g. (role='collectA'), (role='collectB'), (role='archive')
  • point your Spark inserts to (role='activeA')
  • at some point, switch to (role='activeB')
  • then dump every record that you have collected in the "A" partition into "archive", hoping that Hive default config will do a good job of limiting fragmentation

    INSERT INTO TABLE twitter_data PARTITION (role='archive') SELECT ... FROM twitter_data WHERE role='activeA' ; TRUNCATE TABLE twitter_data PARTITION (role='activeA') ;

  • at some point, switch back to "A" etc.

One last word: if Hive still creates too many files on each compaction job, then try tweaking some parameters in your session, just before the INSERT e.g.

set hive.merge.mapfiles =true;
set hive.merge.mapredfiles =true;
set hive.merge.smallfiles.avgsize=1024000000;
查看更多
老娘就宠你
3楼-- · 2020-02-08 10:01

Hive was designed for massive batch processing, not for transactions. That's why you have at least one data file for each LOAD or INSERT-SELECT command. And that's also why you have no INSERT-VALUES command, hence the lame syntax displayed in your post as a necessary workaround.

Well... that was true until transaction support was introduced. In a nutshell you need (a) Hive V0.14 and later (b) an ORC table (c) transaction support enabled on that table (i.e. locks, periodic background compaction, etc)

The wiki about Streaming data ingest in Hive might be a good start.

查看更多
来,给爷笑一个
4楼-- · 2020-02-08 10:09

you can use these options together.

  1. turn on acid
  2. create orc table K with transactional property.
  3. insert many times into K. by streaming or just use insert dml.
  4. hive will automatically create small delta files
  5. minor ir major compactions will happen
  6. small files will be merged to large file.
查看更多
登录 后发表回答