错误而从CSV文件中的数据映射到在HDFS一个蜂房表(Error while mapping the

2019-10-16 13:00发布

我想按照下面的步骤来加载数据帧到一个蜂巢表:

  1. 阅读源表和数据帧保存为HDFS上一个CSV文件

     val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016").option("user", devUserName).option("password", devPassword).option("partitionColumn","header_id").option("lowerBound", 199199).option("upperBound", 284058).option("numPartitions",10).load() 
  2. 订购列按我的蜂巢表列我蜂巢表列存在于格式的字符串:

     val hiveCols = col1:coldatatype|col2:coldatatype|col3:coldatatype|col4:coldatatype...col200:datatype val schemaList = hiveCols.split("\\|") val hiveColumnOrder = schemaList.map(e => e.split("\\:")).map(e => e(0)).toSeq val finalDF = yearDF.selectExpr(hiveColumnOrder:_*) 

    我在“使用ExecQuery”读列是一样的“hiveColumnOrder”和公正,以确保订单的顺序,我选择再次使用selectExpr在yearDF列

  3. 保存数据帧作为HDFS CSV文件:

     newDF.write.format("CSV").save("hdfs://username/apps/hive/warehouse/database.db/lines_test_data56/") 
  4. 一旦我保存数据帧,我采取相同的列从“hiveCols”,准备一个DDL在同一位置与值是逗号分隔的如下面给出创建配置单元表:

创建表,如果不存在schema.tablename(COL1 coldatatype,COL2 coldatatype,COL3 coldatatype,COL4 coldatatype ... col200数据类型)
行格式分隔的字段TERMINATED BY“”
存储为TEXTFILE

LOCATION 'HDFS://username/apps/hive/warehouse/database.db/lines_test_data56/';

之后我加载数据框到创建的表,我对着这里的问题是,当我查询表,我得到的查询输出不当。 对于例如:如果我申请的数据框下面的查询将其保存为一个文件之前:

finalDF.createOrReplaceTempView("tmpTable")
select header_id,line_num,debit_rate,debit_rate_text,credit_rate,credit_rate_text,activity_amount,activity_amount_text,exchange_rate,exchange_rate_text,amount_cr,amount_cr_text from tmpTable where header_id=19924598 and line_num=2

我得到的输出正常。 所有值都正确对齐列:

[19924598,2,null,null,381761.40000000000000000000,381761.4,-381761.40000000000000000000,-381761.4,0.01489610000000000000,0.014896100000000,5686.76000000000000000000,5686.76]

但节能数据帧中的一个CSV文件后,创建的它(第四步)顶部的表,并应用在创建表我看到的数据是混乱,并与列不当映射相同的查询:

select header_id,line_num,debit_rate,debit_rate_text,credit_rate,credit_rate_text,activity_amount,activity_amount_text,exchange_rate,exchange_rate_text,amount_cr,amount_cr_text from schema.tablename where header_id=19924598 and line_num=2

+---------------+--------------+-------------+------------------+-------------+------------------+--------------------------+-------------------------------+------------------------+-----------------------------+--------------------+-------------------------+--+
| header_id     | line_num     | debit_rate  | debit_rate_text  | credit_rate  | credit_rate_text  | activity_amount  | activity_amount_text  | exchange_rate  | exchange_rate_text  | amount_cr  | amount_cr_text  |
+---------------+--------------+-------------+------------------+-------------+------------------+--------------------------+-------------------------------+------------------------+-----------------------------+--------------------+-------------------------+--+
| 19924598      | 2            | NULL        |                  | 381761.4    |                    | 5686.76          | 5686.76               | NULL           | -5686.76            | NULL       |                 |

所以,我想用在我创建的蜂巢表前期不同的方式,将数据插入从数据帧:

  • 在步骤4运行上面的DDL
  • finalDF.createOrReplaceTempView( “tmpTable”)
  • spark.sql( “插入schema.table SELECT * FROM tmpTable”)

如果我跑了上述选择查询,一旦工作完成,即使这样失败。 我尝试使用刷新表refresh table schema.tablemsckrepair table schema.table只是为了看看有没有与任何元数据的问题,但似乎没有任何锻炼。

谁能告诉我是什么原因造成这种现象,是有一个与我在这里操作数据的方式有任何问题?

Answer 1:

码被使用火花2.3.2测试


相反,从CSV文件创建星火据帧,然后把它注册为蜂巢表,您可以轻松地运行SQL命令,并创建CSV文件蜂巢表

val conf = new SparkConf
    conf
      .set("hive.server2.thrift.port", "10000")
      .set("spark.sql.hive.thriftServer.singleSession", "true")
      .set("spark.sql.warehouse.dir", "hdfs://PATH_FOR_HIVE_METADATA")
      .set("spark.sql.catalogImplementation","hive")
      .setMaster("local[*]")
      .setAppName("ThriftServer")

val spark = SparkSession.builder()
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()

现在,使用spark对象,你可以运行SQL命令,如蜂房的用户:

spark.sql("DROP DATABASE IF EXISTS my_db CASCADE")
spark.sql("create database if not exists my_db")
spark.sql("use my_db")

使用下面的代码,你可以加载在HDFS目录中的所有csv_files(或者你可以给一个准确CSV文件的路径):

spark.sql(
      "CREATE TABLE test_table(" +
        "id int," +
        "time_stamp bigint," +
        "user_name string) " +
        "ROW FORMAT DELIMITED " +
        "FIELDS TERMINATED BY ',' " +
        "STORED AS TEXTFILE " +
        "LOCATION 'hdfs://PATH_TO_CSV_Directory_OR_CSV_FILE' "
    )

并在年底寄存器星火sqlContext对象为蜂巢ThriftServer:

HiveThriftServer2.startWithContext(spark.sqlContext)

这将创建上的端口10000 ThriftServer端点。

INFO ThriftCLIService: Starting ThriftBinaryCLIService on port 10000 with 5...500 worker threads

现在你可以运行直线,并连接到ThriftServer:

beeline> !connect jdbc:hive2://localhost:10000
Connecting to jdbc:hive2://localhost:10000
Enter username for jdbc:hive2://localhost:10000: enter optional_username
Enter password for jdbc:hive2://localhost:10000: leave blank
Connected to: Spark SQL (version 2.3.2)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10000>

如果表测试test_table下创建my_db数据库:

0: jdbc:hive2://localhost:10000> use my_db;
0: jdbc:hive2://localhost:10000> show tables ;
+-----------+-----------------------+--------------+--+
| database  |       tableName       | isTemporary  |
+-----------+-----------------------+--------------+--+
| my_db     | test_table            | false        |
+-----------+-----------------------+--------------+--+

此外,您还可以创建使用ThrifServer JDBC端点的任何其他蜂巢表(或任何HiveQL命令)。

以下是所需的依赖:

 libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion,
  "org.apache.spark" %% "spark-hive-thriftserver" % sparkVersion,
  "org.apache.hadoop" % "hadoop-hdfs" % "2.8.3",
  "org.apache.hadoop" % "hadoop-common" % "2.8.3",
)


Answer 2:

我用rowformat SERDE:org.apache.hadoop.hive.serde2.OpenCSVSerde在蜂巢DDL。 这也有“”作为默认的分隔char和我没有给任何其他分隔符。



文章来源: Error while mapping the data from CSV file to a Hive table on HDFS