Streaming to HBase with pyspark

2019-04-10 21:58发布

There is a fair amount of info online about bulk loading to HBase with Spark streaming using Scala (these two were particularly useful) and some info for Java, but there seems to be a lack of info for doing it with PySpark. So my questions are:

  • How can data be bulk loaded into HBase using PySpark?
  • Most examples in any language only show a single column per row being upserted. How can I upsert multiple columns per row?

The code I currently have is as follows:

if __name__ == "__main__":

    context = SparkContext(appName="PythonHBaseBulkLoader")
    streamingContext = StreamingContext(context, 5)

    stream = streamingContext.textFileStream("file:///test/input");

    stream.foreachRDD(bulk_load)

    streamingContext.start()
    streamingContext.awaitTermination()

What I need help with is the bulk load function

def bulk_load(rdd):
    #???

I've made some progress previously, with many and various errors (as documented here and here)

1条回答
倾城 Initia
2楼-- · 2019-04-10 22:37

So after much trial and error, I present here the best I have come up with. It works well, and successfully bulk loads data (using Puts or HFiles) I am perfectly willing to believe that it is not the best method, so any comments/other answers are welcome. This assume you're using a CSV for your data.

Bulk loading with Puts

By far the easiest way to bulk load, this simply creates a Put request for each cell in the CSV and queues them up to HBase.

def bulk_load(rdd):
    #Your configuration will likely be different. Insert your own quorum and parent node and table name
    conf = {"hbase.zookeeper.qourum": "localhost:2181",\
            "zookeeper.znode.parent": "/hbase-unsecure",\
            "hbase.mapred.outputtable": "Test",\
            "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",\
            "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\
            "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}

    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

    load_rdd = rdd.flatMap(lambda line: line.split("\n"))\#Split the input into individual lines
                  .flatMap(csv_to_key_value)#Convert the CSV line to key value pairs
    load_rdd.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)

The function csv_to_key_value is where the magic happens:

def csv_to_key_value(row):
    cols = row.split(",")#Split on commas.
    #Each cell is a tuple of (key, [key, column-family, column-descriptor, value])
    #Works well for n>=1 columns
    result = ((cols[0], [cols[0], "f1", "c1", cols[1]]),
              (cols[0], [cols[0], "f2", "c2", cols[2]]),
              (cols[0], [cols[0], "f3", "c3", cols[3]]))
    return result

The value converter we defined earlier will convert these tuples into HBase Puts

Bulk loading with HFiles

Bulk loading with HFiles is more efficient: rather than a Put request for each cell, an HFile is written directly and the RegionServer is simply told to point to the new HFile. This will use Py4J, so before the Python code we have to write a small Java program:

import py4j.GatewayServer;
import org.apache.hadoop.hbase.*;

public class GatewayApplication {

    public static void main(String[] args)
    {
        GatewayApplication app = new GatewayApplication();
        GatewayServer server = new GatewayServer(app);
        server.start();
    }
}

Compile this, and run it. Leave it running as long as your streaming is happening. Now update bulk_load as follows:

def bulk_load(rdd):
    #The output class changes, everything else stays
    conf = {"hbase.zookeeper.qourum": "localhost:2181",\
            "zookeeper.znode.parent": "/hbase-unsecure",\
            "hbase.mapred.outputtable": "Test",\
            "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",\
            "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\
            "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}#"org.apache.hadoop.hbase.client.Put"}

    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

    load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
                  .flatMap(csv_to_key_value)\
                  .sortByKey(True)
    #Don't process empty RDDs
    if not load_rdd.isEmpty():
        #saveAsNewAPIHadoopDataset changes to saveAsNewAPIHadoopFile
        load_rdd.saveAsNewAPIHadoopFile("file:///tmp/hfiles" + startTime,
                                        "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",
                                        conf=conf,
                                        keyConverter=keyConv,
                                        valueConverter=valueConv)
        #The file has now been written, but HBase doesn't know about it

        #Get a link to Py4J
        gateway = JavaGateway()
        #Convert conf to a fully fledged Configuration type
        config = dict_to_conf(conf)
        #Set up our HTable
        htable = gateway.jvm.org.apache.hadoop.hbase.client.HTable(config, "Test")
        #Set up our path
        path = gateway.jvm.org.apache.hadoop.fs.Path("/tmp/hfiles" + startTime)
        #Get a bulk loader
        loader = gateway.jvm.org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles(config)
        #Load the HFile
        loader.doBulkLoad(path, htable)
    else:
        print("Nothing to process")

Finally, the fairly straightforward dict_to_conf:

def dict_to_conf(conf):
    gateway = JavaGateway()
    config = gateway.jvm.org.apache.hadoop.conf.Configuration()
    keys = conf.keys()
    vals = conf.values()
    for i in range(len(keys)):
        config.set(keys[i], vals[i])
    return config

As you can see, bulk loading with HFiles is more complex than using Puts, but depending on your data load it is probably worth it since once you get it working it's not that difficult.

One last note on something that caught me off guard: HFiles expect the data they receive to be written in lexical order. This is not always guaranteed to be true, especially since "10" < "9". If you have designed your key to be unique, then this can be fixed easily:

load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
              .flatMap(csv_to_key_value)\
              .sortByKey(True)#Sort in ascending order
查看更多
登录 后发表回答