星火插入到数据帧HBase的(Insert Spark dataframe into hbase)

2019-09-28 15:31发布

我有一个数据帧,我想将其插入到HBase的。 我遵循这个documenation 。

这是我的数据框什么样子:

 --------------------
|id | name | address |
|--------------------|
|23 |marry |france   |
|--------------------|
|87 |zied  |italie   |
 --------------------

创建使用此代码HBase的表:

val tableName = "two"
val conf = HBaseConfiguration.create()
if(!admin.isTableAvailable(tableName)) {
          print("-----------------------------------------------------------------------------------------------------------")
          val tableDesc = new HTableDescriptor(tableName)
          tableDesc.addFamily(new HColumnDescriptor("z1".getBytes()))
          admin.createTable(tableDesc)
        }else{
          print("Table already exists!!--------------------------------------------------------------------------------------")
        }

我现在该怎么可以将这个数据帧到HBase的?

在另一个例子中,我成功使用此代码插入到HBase的:

val myTable = new HTable(conf, tableName)
    for (i <- 0 to 1000) {
      var p = new Put(Bytes.toBytes(""+i))
      p.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(""+(i*5)))
      p.add("z1".getBytes(), "age".getBytes(), Bytes.toBytes("2017-04-20"))
      p.add("z2".getBytes(), "job".getBytes(), Bytes.toBytes(""+i))
      p.add("z2".getBytes(), "salary".getBytes(), Bytes.toBytes(""+i))
      myTable.put(p)
    }
    myTable.flushCommits()

但现在我卡住了,怎么我的数据帧中的每个记录插入到我的HBase的表。

感谢您的时间和精力

Answer 1:

使用代码格式化的目的文件告诉答案:

sc.parallelize(data).toDF.write.options(
 Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
 .format("org.apache.hadoop.hbase.spark ")
 .save()

其中sc.parallelize(数据).toDF是你的数据帧。 文档例如使用sc.parallelize(数据)变为阶集合数据帧.toDF

你已经拥有你的数据帧,只是试图调用

yourDataFrame.write.options(
     Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
     .format("org.apache.hadoop.hbase.spark ")
     .save()

它应该工作。 文件是相当清楚的...

UPD

考虑到与特定模式的数据框,上面会创建5个地区的HBase的表,里面保存数据帧。 请注意,如果没有指定HBaseTableCatalog.newTable时,该表必须预先创建。

这是关于数据分区。 每个HBase的表可以有1点... X的区域。 你应该仔细挑选区域的数目。 低区号码是坏的。 高区号码也很糟糕。



Answer 2:

一个替代是看rdd.saveAsNewAPIHadoopDataset,将数据插入到HBase的表。

def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("sparkToHive").enableHiveSupport().getOrCreate()
    import spark.implicits._

    val config = HBaseConfiguration.create()
    config.set("hbase.zookeeper.quorum", "ip's")
    config.set("hbase.zookeeper.property.clientPort","2181")
    config.set(TableInputFormat.INPUT_TABLE, "tableName")

    val newAPIJobConfiguration1 = Job.getInstance(config)
    newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "tableName")
    newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    val df: DataFrame  = Seq(("foo", "1", "foo1"), ("bar", "2", "bar1")).toDF("key", "value1", "value2")

    val hbasePuts= df.rdd.map((row: Row) => {
      val  put = new Put(Bytes.toBytes(row.getString(0)))
      put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("value1"), Bytes.toBytes(row.getString(1)))
      put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("value2"), Bytes.toBytes(row.getString(2)))
      (new ImmutableBytesWritable(), put)
    })

    hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration())
    }

参考: https://sparkkb.wordpress.com/2015/05/04/save-javardd-to-hbase-using-saveasnewapihadoopdataset-spark-api-java-coding/



文章来源: Insert Spark dataframe into hbase