I am working with Apache NiFi 0.5.1 on a Groovy script to replace incoming Json values with the ones contained in a mapping file. The mapping file looks like this (it is a simple .txt):
Header1;Header2;Header3
A;some text;A2
I have started with the following:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
def flowFile = session.get();
if (flowFile == null) {
return;
}
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def content = """
{
"field1": "A"
"field2": "A",
"field3": "A"
}"""
def slurped = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurped)
builder.content.field1 = "A"
builder.content.field2 = "some text"
builder.content.field3 = "A2"
outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
This first step works just fine, although it is hardcoded and it is far from being ideal. My initial thought was to use the ReplaceTextWithMapping to be able to perform the substitutions, however it does not work well with complex mapping files (e.g. multi-columns). I would like to take this further, but I am not sure how to go about it. First of all, instead of passing in the entire harcoded JSON, I would like to read the incoming flowfile. How is that possible in NiFi? Before running the script as part of ExecuteScript, I have output a .Json file with content via the UpdateAttribute where filename = myResultingJSON.json. Furthermore, I know how to load a .txt file with Groovy (String mappingContent= new File('/path/to/file').getText('UTF-8'
), however how do I use the loaded file to perform the substitutions so that my resulting JSON would look like this:
{
"field1": "A"
"field2": "some text",
"field3": "A2"
}
Thank you for your help,
I.
EDIT:
First modification to the script does allow me to read from the InputStream:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
def flowFile = session.get();
if (flowFile == null) {
return;
}
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8)
def slurped = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurped)
builder.content.field1 = "A"
builder.content.field2 = "some text"
builder.content.field3 = "A2"
outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
I have then moved to testing the approach with the ConfigSlurper and wrote a generic class before injecting the logic into the Groovy ExecuteScript:
class TestLoadingMappings {
static void main(String[] args) {
def content = '''
{"field2":"A",
"field3": "A"
}
'''
println "This is the content of the JSON file" + content
def slurped = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurped)
println "This is the content of my builder " + builder
def propertiesFile = new File("D:\\myFile.txt")
Properties props = new Properties()
props.load(new FileInputStream(propertiesFile))
def config = new ConfigSlurper().parse(props).flatten()
println "This is the content of my config " + config
config.each { k, v ->
if (builder[k]) {
builder[k] = v
}
}
println(builder.toPrettyString())
}
}
I am returned with a groovy.lang.MissinPropertyException and this is because the mapping is not that straightforward. All fields/properties (from field1 to field3) come into the InpuStream with the same value (e.g.) and this means that every time field2, for example, has that value you can be certain that it will be valid for the other two properties. However, I cannot have a mapping field that maps "field2" : "someText" because the actual mapping is driven by the first value in the mapping file. Here an example:
{
"field1": "A"
"field2": "A",
"field3": "A"
}
In my mapping file I have:
A;some text;A2
However field1 needs mapping to A (first value in the file) or stay the same, if you wish. Field2 needs mapping to the value in the last column (A2) and finally Field3 needs mapping to 'some text' in the middle column.
Can you help with this? Is that something I can achieve with Groovy and ExecuteScript. If needed I can split the config files into two.
Also, I have had a quick look at the other option (PutDistributedMapCache) and I am not sure I have understood how to load key-value pairs into to a distributed map cache. It looks like you would need to have a DistributedMapCacheClient and I am not sure whether this can be easy to implement.
Thank you!
EDIT 2:
Some other progress, I have now the mapping working, but not sure why it fails when reading the second line of the properties file:
"A" someText
"A2" anotherText
class TestLoadingMappings {
static void main(String[] args) {
def content = '''
{"field2":"A",
"field3":"A"
}
'''
println "This is the content of the JSON file" + content
def slurper = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurper)
println "This is the content of my builder " + builder
assert builder.content.field2 == "A"
assert builder.content.field3 == "A"
def propertiesFile = new File('D:\\myTest.txt')
Properties props = new Properties()
props.load(new FileInputStream(propertiesFile))
println "This is the content of the properties " + props
def config = new ConfigSlurper().parse(props).flatten()
config.each { k, v ->
if (builder.content.field2) {
builder.content.field2 = config[k]
}
if (builder.content.field3) {
builder.content.field3 = config[k]
}
println(builder.toPrettyString())
println "This is my builder " + builder
}
}
}
I am returned with: This is my builder {"field2":"someText","field3":"someText"}
Any idea why?
Thank you so much
EDIT 3 (Moved from below)
I have written the following:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
class TestLoadingMappings {
static void main(String[] args) {
def content =
'''
{"field2":"A",
"field3":"A"
}
'''
def slurper = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurper)
println "This is the content of my builder " + builder
def propertiesFile = new File('D:\\properties.txt')
Properties props = new Properties()
props.load(new FileInputStream(propertiesFile))
def conf = new ConfigSlurper().parse(props).flatten()
conf.each { k, v ->
if (builder.content[k]) {
builder.content[k] = v
}
println("This prints the resulting JSON :" + builder.toPrettyString())
}
}
}
However, I had to change the structure of the mapping file as following:
"field1"="substitutionText"
"field2"="substitutionText2"
I have then 'incorporated' the ConfigSlurper into the ExecuteScript script, as follows:
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.StandardCharsets
def flowFile = session.get();
if (flowFile == null) {
return;
}
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def slurped = new JsonSlurper().parseText(content)
def builder = new JsonBuilder(slurped)
outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
def propertiesFile = new File(''D:\\properties.txt')
Properties props = new Properties()
props.load(new FileInputStream(propertiesFile))
def conf = new ConfigSlurper().parse(props).flatten();
conf.each { k, v ->
if (builder.content[k]) {
builder.content[k] = v
}
}
outputStream.write(content.toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
The problem seems to be the fact that I cannot really replicate the logic in the original mapping file by using something similar to the one created for in my TestLoadingMappings. As mentioned in my previous comments/edits, the mapping should work in this way:
field2 = if A then substitute to "some text"
field3 = if A then substitute to A2
...
field2 = B then substitute to "some other text"
field3 = B then substitute to B2
and son on.
In a nutshell, the mappings are driven by the incoming value in the InputStream (which varies), which conditionally maps to different values depending on the JSON attribute. Can you please recommend a better way to achieve this mapping via a Groovy/ExecuteScript? I have flexibility in modifying the mapping file, can you see a way where I can change it in order to achieve the desired mappings?
Thanks