How to update line with modified data in Jython?

2019-03-04 05:54发布

问题:

I'm have a csv file which contains hundred thousands of rows and below are some sample lines..,

1,Ni,23,28-02-2015 12:22:33.2212-02
2,Fi,21,28-02-2015 12:22:34.3212-02
3,Us,33,30-03-2015 12:23:35-01
4,Uk,34,31-03-2015 12:24:36.332211-02

I need to get the last column of csv data which is in wrong datetime format. So I need to get default datetimeformat("YYYY-MM-DD hh:mm:ss[.nnn]") from last column of the data.

I have tried the following script to get lines from it and write into flow file.

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
    for line in text[1:]:
        outputStream.write(line + "\n") 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename'))
  session.transfer(flowFile, REL_SUCCESS)

but I am not able to find a way to convert it like below output.

1,Ni,23,28-02-2015 12:22:33.221
2,Fi,21,29-02-2015 12:22:34.321
3,Us,33,30-03-2015 12:23:35
4,Uk,34,31-03-2015 12:24:36.332

I have checked solutions with my friend(google) and was still not able to find solution.

Can anyone guide me to convert those input data into my required output?

回答1:

In this transformation the unnecessary data located at the end of each line, so it's really easy to manage transform task with regular expression.

^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?

Check the regular expression and explanation here: https://regex101.com/r/sAB4SA/2

As soon as you have a large file - better not to load it into the memory. The following code loads whole the file into the memory:

IOUtils.readLines(inputStream, StandardCharsets.UTF_8)

Better to iterate line by line.

So this code is for ExecuteScript nifi processor with python (Jython) language:

import sys
import re
import traceback
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
from java.lang import Class
from java.io import BufferedReader
from java.io import InputStreamReader
from java.io import OutputStreamWriter


class TransformCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        try:
            writer = OutputStreamWriter(outputStream,"UTF-8")
            reader = BufferedReader(InputStreamReader(inputStream,"UTF-8"))
            line = reader.readLine()
            p = re.compile('^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?')
            while line!= None:
                # print line
                match = p.search(line)
                writer.write( match.group(1) + (match.group(3) if match.group(3)!=None else '') )
                writer.write('\n')
                line = reader.readLine()
            writer.flush()
            writer.close()
            reader.close()
        except:
            traceback.print_exc(file=sys.stdout)
            raise


flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile, TransformCallback())

    # Finish by transferring the FlowFile to an output relationship
    session.transfer(flowFile, REL_SUCCESS)

And as soon as question is about nifi, here are alternatives that seems to be easier


the same code as above but in groovy for nifi ExecuteScript processor:

def ff = session.get()
if(!ff)return
ff = session.write(ff, {rawIn, rawOut->
    // ## transform streams into reader and writer
    rawIn.withReader("UTF-8"){reader->
        rawOut.withWriter("UTF-8"){writer->
            reader.eachLine{line, lineNum->
                if(lineNum>1) { // # skip the first line
                    // ## let use regular expression to transform each line
                    writer << line.replaceAll( /^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?/ , '$1$3' ) << '\n'
                }
            }
        }
    }
} as StreamCallback)
session.transfer(ff, REL_SUCCESS)

ReplaceText processor

And if regular expression is ok - the easiest way in nifi is a ReplaceText processor that could do regular expression replace line-by-line.

In this case you don't need to write any code, just build the regular expression and configure your processor correctly.



回答2:

Just using pure jython. It is an example that can be adapted to OP's needs.

Define a datetime parser for this csv file

from datetime import datetime
def parse_datetime(dtstr):
    mydatestr='-'.join(dtstr.split('-')[:-1])
    try:
        return datetime.strptime(mydatestr,'%d-%m-%Y %H:%M:%S.%f').strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]
    except ValueError:
        return datetime.strptime(mydatestr,'%d-%m-%Y %H:%M:%S').strftime('%d-%m-%Y %H:%M:%S')

my test.csv includes data like this: ( 2015 didnt have 29 Feb had to change OP's example ).

1,Ni,23,27-02-2015 12:22:33.2212-02
2,Fi,21,28-02-2015 12:22:34.3212-02
3,Us,33,30-03-2015 12:23:35-01
4,Uk,34,31-03-2015 12:24:36.332211-02

now the solution

with open('test.csv') as fi:
    for line in fi:
        line_split=line.split(',')
        out_line = ', '.join(word if i<3 else parse_datetime(word) for i,word in enumerate(line_split))
        #print(out_line)
        #you can write this out_line to a file here. 

printing out_line looks like this

1, Ni, 23, 27-02-2015 12:22:33.221
2, Fi, 21, 28-02-2015 12:22:34.321
3, Us, 33, 30-03-2015 12:23:35
4, Uk, 34, 31-03-2015 12:24:36.332


回答3:

You can get them with regex :

(\d\d-\d\d-\d\d\d\d\ \d\d:\d\d:)(\d+(?:\.\d+)*)(-\d\d)$

Then just replace #2 with a rounded version of #2

See regex example at regexr.com

You could even do it "nicer" by getting every single value with a capturing group and then put them into a datetime.datetime object and print it from there, but imho that would be an overkill in maintainability and loose you too much performance.

Code had no possibility to test

import re
...
pattern = '^(.{25})(\d+(?:\.\d+)*)(-\d\d)$' //used offset for simplicity

....

  for line in text[1:]:
    match = re.search(pattern, line)
    line = match.group(1) + round(match.group(2),3) + match.group(3)
    outputStream.write(line + "\n")