How to map the column wise data in flowfile in NiF

2019-05-28 19:53发布

问题:

i have csv file which having following structure.,

Alfreds,Centro,Ernst,Island,Bacchus
Germany,Mexico,Austria,UK,Canada
01,02,03,04,05

Now i have to move that data into database like below.

Name,City,ID
Alfreds,Germay,01
Centro,Mexico,02
Ernst,Austria,03
Island,UK,04
Bacchus,Canda,05

i try to map those colums but i can't able to extract the data in column wise.

Here my input data in column wise but i need to insert those in row wise in SQLServer

Can anyone suggest way to transfer column wise data into row wise in sql server?.

Thanks

回答1:

There is no existing Apache NiFi processor to perform column transposition. One of the problems is that this is difficult to do in a streaming manner, as most NiFi components are designed, because in a naïve implementation you need to hold the entire contents of the flowfile in active memory at the same time.

I would recommend using an ExecuteScript processor to do this (here's a 6 line Python example). Be careful doing this because you can easily end up overflowing your heap if it is not set properly/you read unexpectedly large files into memory.

You could write a custom processor which performs a streaming transpose operation by iterating over each of n rows and reading up to your delimiter, storing a byte counter per row, combining the n elements as a single output row, and repeating the process starting from the respective byte counter of each row. (Given m columns, this is O(m * n)).

Another solution would be splitting the CSV input into individual rows using the SplitText processor, using an ExecuteScript or custom processor to transpose a single row into a single column, and then using a custom merge operation (either extend the existing MergeContent processor or write a script to do this) which laterally concatenates the incoming columns into a reconstructed matrix. (O(n) + O(n) + O(m) => O(2n + m) but the individual transposition operations can be performed in parallel so with x threads it's O(n + n/x + m)).

Any of these approaches will require some level of custom development. If you are really hesitant to pursue that, you could try using ExecuteStreamCommand and one of the many bash solutions to do the transposition on the command-line.



回答2:

@Andy,

It could be possible in NiFi also without using ExecuteScript.

I have extract the 3 input rows as input.1,input.2,input.3 in ExtractText. And then count number of columns in "input.1" using AnydelinateValues in expression language and store that in "TotalCount" Attribute.

Initially made "Count=1".

Using Loop Concept to get the first column by using "Count" and then increment "Count" Check "Count" in RouteOnAttribute "le(totalcount)"

Now form insert Query with "Count" Attribute.

It worked well for me.It could be useful for someone.