我使用pyspark [spark2.3.1]和Hbase1.2.1,我想知道这可能是访问HBase的使用pyspark的最佳方式是什么?
我做了搜索的一些初始水平,发现有可用的几个选项像使用SHC-核心:1.1.1-2.1-s_2.11.jar这可以实现,但无论我尝试寻找一些例子,最多的地方代码是用Scala编写或例子也是基于阶。 我试着在pyspark实施基本代码:
from pyspark import SparkContext
from pyspark.sql import SQLContext
def main():
sc = SparkContext()
sqlc = SQLContext(sc)
data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'
catalog = ''.join("""{
"table":{"namespace":"default", "name":"firsttable"},
"rowkey":"key",
"columns":{
"firstcol":{"cf":"rowkey", "col":"key", "type":"string"},
"secondcol":{"cf":"d", "col":"colname", "type":"string"}
}
}""".split())
df = sqlc.read.options(catalog=catalog).format(data_source_format).load()
df.select("secondcol").show()
# entry point for PySpark application
if __name__ == '__main__':
main()
并使用运行它:
spark-submit --master yarn-client --files /opt/hbase-1.1.2/conf/hbase-site.xml --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --jars /home/ubuntu/hbase-spark-2.0.0-alpha4.jar HbaseMain2.py
它返回我空输出:
+---------+
|secondcol|
+---------+
+---------+
我不知道我究竟做错了什么? 也不知什么是这样做的最好的方法?
任何引用,将不胜感激。
问候
最后,使用SHC,我能够使用pyspark代码火花2.3.1连接到HBase的-1.2.1。 以下是我的工作:
我所有的Hadoop [名称节点,数据节点,节点管理器,ResourceManager的] HBase的[HMASTER,HRegionServer,HQuorumPeer]守护程序启动和运行在我的EC2实例。
我放置在HDFS位置/test/emp.csv emp.csv文件,用数据:
键,EMPID,empName,empWeight 1 “E007”, “布佩希”,115.10 2, “E008”, “Chauhan保持”,110.23 3, “E009”,大地,90.0 4中, “E0010”, “拉吉”,80.0 5 “E0011”, “肖汉”,100.0
我创建readwriteHBase.py文件与以下的代码行[用于读取从HDFS emp.csv文件,然后在HBase的第一创建tblEmployee,推入数据然后tblEmployee再次从同一表中读取一些数据,并在控制台上显示它]:
from pyspark.sql import SparkSession def main(): spark = SparkSession.builder.master("yarn-client").appName("HelloSpark").getOrCreate() dataSourceFormat = "org.apache.spark.sql.execution.datasources.hbase" writeCatalog = ''.join("""{ "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"}, "rowkey":"key", "columns":{ "key":{"cf":"rowkey", "col":"key", "type":"int"}, "empId":{"cf":"personal","col":"empId","type":"string"}, "empName":{"cf":"personal", "col":"empName", "type":"string"}, "empWeight":{"cf":"personal", "col":"empWeight", "type":"double"} } }""".split()) writeDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/test/emp.csv") print("csv file read", writeDF.show()) writeDF.write.options(catalog=writeCatalog, newtable=5).format(dataSourceFormat).save() print("csv file written to HBase") readCatalog = ''.join("""{ "table":{"namespace":"default", "name":"tblEmployee"}, "rowkey":"key", "columns":{ "key":{"cf":"rowkey", "col":"key", "type":"int"}, "empId":{"cf":"personal","col":"empId","type":"string"}, "empName":{"cf":"personal", "col":"empName", "type":"string"} } }""".split()) print("going to read data from Hbase table") readDF = spark.read.options(catalog=readCatalog).format(dataSourceFormat).load() print("data read from HBase table") readDF.select("empId", "empName").show() readDF.show() # entry point for PySpark application if __name__ == '__main__': main()
冉用命令虚拟机控制台上运行此脚本:
spark-submit --master yarn-client --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://nexus-private.hortonworks.com/nexus/content/repositories/IN-QA/ readwriteHBase.py
中间结果:经过阅读CSV文件:
+---+-----+-------+---------+ |key|empId|empName|empWeight| +---+-----+-------+---------+ | 1| E007|Bhupesh| 115.1| | 2| E008|Chauhan| 110.23| | 3| E009|Prithvi| 90.0| | 4|E0010| Raj| 80.0| | 5|E0011|Chauhan| 100.0| +---+-----+-------+---------+
最终输出:从HBase的表中读取数据后:
+-----+-------+ |empId|empName| +-----+-------+ | E007|Bhupesh| | E008|Chauhan| | E009|Prithvi| |E0010| Raj| |E0011|Chauhan| +-----+-------+
注意 :在创建HBASE表和插入数据到HBase的表,它预计NumberOfRegions应大于3,因此我已经加入options(catalog=writeCatalog, newtable=5)
同时将数据添加到HBase的