Create a Postgresql table from Avro Schema in Nifi

2019-02-20 12:23发布

Using InferAvroSchema I got an Avro Schema of my file. I want to create a table in PostregSql using this Avro schema. Which processor I have to use.

I use : GetFile->InferAvroSchema-> I want to create a table from this schema -> Put databaseRecord.

The avro schema :

{
  "type" : "record",
  "name" : "warranty",
  "doc" : "Schema generated by Kite",
  "fields" : [ {
    "name" : "id",
    "type" : "long",
    "doc" : "Type inferred from '1'"
  }, {
    "name" : "train_id",
    "type" : "long",
    "doc" : "Type inferred from '21691'"
  }, {
    "name" : "siemens_nr",
    "type" : "string",
    "doc" : "Type inferred from 'Loco-001'"
  }, {
    "name" : "uic_nr",
    "type" : "long",
    "doc" : "Type inferred from '193901'"
  }, {
    "name" : "Configuration",
    "type" : "string",
    "doc" : "Type inferred from 'ZP28'"
  }, {
    "name" : "Warranty_Status",
    "type" : "string",
    "doc" : "Type inferred from 'Out_of_Warranty'"
  }, {
    "name" : "Warranty_Data_Type",
    "type" : "string",
    "doc" : "Type inferred from 'Real_based_on_preliminary_acceptance_date'"
  }, {
    "name" : "of_progression",
    "type" : "long",
    "doc" : "Type inferred from '100'"
  }, {
    "name" : "Delivery_Date",
    "type" : "string",
    "doc" : "Type inferred from '18/12/2009'"
  }, {
    "name" : "Warranty_on_Delivery_Date",
    "type" : "string",
    "doc" : "Type inferred from '18/12/2013'"
  }, {
    "name" : "Customer_Status",
    "type" : "string",
    "doc" : "Type inferred from 'homologation'"
  }, {
    "name" : "Commissioning_Date",
    "type" : "string",
    "doc" : "Type inferred from '6/10/2010'"
  }, {
    "name" : "Preliminary_acceptance_date",
    "type" : "string",
    "doc" : "Type inferred from '6/01/2011'"
  }, {
    "name" : "Warranty_Start_Date",
    "type" : "string",
    "doc" : "Type inferred from '6/01/2011'"
  }, {
    "name" : "Warranty_End_Date",
    "type" : "string",
    "doc" : "Type inferred from '6/01/2013'"
  }, {
    "name" : "Effective_End_Warranty_Date",
    "type" : [ "null", "string" ],
    "doc" : "Type inferred from 'null'",
    "default" : null
  }, {
    "name" : "Level_2_in_function",
    "type" : "string",
    "doc" : "Type inferred from '17/07/2015'"
  }, {
    "name" : "Baseline",
    "type" : "string",
    "doc" : "Type inferred from '2.10.23.4'"
  }, {
    "name" : "RELN_revision",
    "type" : "string",
    "doc" : "Type inferred from '0434-26.3'"
  }, {
    "name" : "TC_report",
    "type" : "string",
    "doc" : "Type inferred from 'A480140'"
  }, {
    "name" : "Last_version_Date",
    "type" : "string",
    "doc" : "Type inferred from 'A-23/09/2015'"
  }, {
    "name" : "ETCS_ID_NID_Engine",
    "type" : [ "null", "long" ],
    "doc" : "Type inferred from '13001'",
    "default" : null
  }, {
    "name" : "Item_Type",
    "type" : "string",
    "doc" : "Type inferred from 'Item'"
  }, {
    "name" : "Path",
    "type" : "string",
    "doc" : "Type inferred from 'sites/TrWMTISnerc_Community/Lists/X4Trains'"
  } ]
}

and my table create table is :

Create table warranty(
  id    float,
  train_id float,
  siemens_nr    varchar(255),
  uic_nr    float,
  configuration varchar(255),
  warranty_status   varchar(255),
  warranty_data_type    varchar(255),
  of_progression    float,
  delivery_date varchar(255),
  warranty_on_delivery_date varchar(255),
  customer_status   varchar(255),
  commissioning_date    varchar(255),
  preliminary_acceptance_date   varchar(255),
  warranty_start_date   varchar(255),
  warranty_end_date varchar(255),
  effective_end_warranty_date   varchar(255),
  level_2_in_function   varchar(255),
  baseline  varchar(255),
  reln_revision varchar(255),
  tc_report varchar(255),
  last_version_Date varchar(255),
  etcs_id_nid_engine    float,
  item_type  varchar(255),
  path varchar(255)

)

1条回答
我只想做你的唯一
2楼-- · 2019-02-20 12:51

I can suggest ExecuteGroovyScript processor in nifi v1.5+

define new property SQL.mydb - you will be prompted to link its value to a database (DBCPConnectionPool)

choose the database where you want to create a table

and use this script (assume avro schema is in the flow file content)

import groovy.json.JsonSlurper

def ff = session.get()
if(!ff)return

//parse avro schema from flow file content
def schema = ff.read().withReader("UTF-8"){ new JsonSlurper().parse(it) }

//define type mapping
def typeMap = [
    "string"            : "varchar(255)",
    "long"              : "numeric(10)",
    [ "null", "string" ]: "varchar(255)",
    [ "null", "long" ]  : "numeric(10)",
]

assert schema.name && schema.name=~/^\w.*/

//build create table statement
def createTable = "create table ${schema.name} (" +
    schema.fields.collect{ "\n  ${it.name.padRight(39)} ${typeMap[it.type]}" }.join(',') +
    "\n)"

//execute statement through the custom defined property
//SQL.mydb references http://docs.groovy-lang.org/2.4.10/html/api/groovy/sql/Sql.html object
SQL.mydb.execute(createTable as String) //important to cast to String

//transfer flow file to success
REL_SUCCESS << ff
查看更多
登录 后发表回答