Nifi processor is not parsing JSON correctly

2019-06-06 17:13发布

问题:

I am using EvaluateJsonPath to extract one particular value from JSON. I am using the follwoing JSONPath expression:

 $.data[?(@.containerType == 'SOURCE' && @.path == 'SOURCE_KYLO_DATALAKE')].id

This is the JSON document I'm calling the JSONPath on :

{"data":[{"id":"dc18bf87-c5a6-4600-9584-e79fb988b1d0","path":["@Rakesh.Prasad@diageo.com"],"tag":"0","type":"CONTAINER","containerType":"HOME"},{"id":"42e52055-4deb-4d5d-942f-4e1c4e48c35e","path":["BPM"],"tag":"3","type":"CONTAINER","containerType":"SPACE"},{"id":"49e3d118-e4f9-41ef-ad97-6b2745c75c4f","path":["DATABRICKS_USAGE_REPORT"],"tag":"0","type":"CONTAINER","containerType":"SPACE"},{"id":"613f52e9-64df-4c9c-b083-c282f349eb4e","path":["LIGHTHOUSE"],"tag":"3","type":"CONTAINER","containerType":"SPACE"},{"id":"f57bcd83-4d0e-481e-b880-0fb8b20798a1","path":["MDM"],"tag":"2","type":"CONTAINER","containerType":"SPACE"},{"id":"745cd2d5-7303-4c0a-9cab-f5205b9eec90","path":["NIELSEN"],"tag":"2","type":"CONTAINER","containerType":"SPACE"},{"id":"b40da338-c429-4bb3-b2ef-51295a143fc8","path":["PowerBI"],"tag":"0","type":"CONTAINER","containerType":"SPACE"},{"id":"dffd025c-b0f0-4b9b-9060-da4aa54204d1","path":["REFERENCE_DATA"],"tag":"1","type":"CONTAINER","containerType":"SPACE"},{"id":"14f9759a-2059-4728-acad-fe01f129f148","path":["SAP_ODP_MASTERDATA"],"tag":"1","type":"CONTAINER","containerType":"SPACE"},{"id":"063bb5e8-041a-4f69-98a3-d2509d5e89d0","path":["TRAX"],"tag":"1","type":"CONTAINER","containerType":"SPACE"},{"id":"9c737147-6632-4328-bf10-ba4959a2806f","path":["TRAX_API"],"tag":"0","type":"CONTAINER","containerType":"SPACE"},{"id":"99167858-17ca-406f-b887-62af3d0da68a","path":["DEPLETION"],"tag":"1","type":"CONTAINER","containerType":"SPACE"},{"id":"52f17de1-a66e-4f08-9077-04acf3914663","path":["SOURCE_ADLS_NIELSEN_PROD"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"bea0de9c-b579-46bd-89ff-4b9497c3910e","path":["SOURCE_KYLO_DATALAKE"],"tag":"5","type":"CONTAINER","containerType":"SOURCE"},{"id":"20985e83-cd31-469e-9a17-1e586bccfb27","path":["SOURCE_LIGHTHOUSE_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"47406901-c9ce-4fce-b0ab-37b07338949b","path":["SOURCE_MDM_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"f1feff7d-8ada-46bb-a5fe-0283a2c746b3","path":["SOURCE_MDS_UAT"],"tag":"0","type":"CONTAINER","containerType":"SOURCE"},{"id":"48a5d1b6-8d32-449d-a317-d242f2394e71","path":["SOURCE_NIELSEN_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"32eaeeb5-60d5-4d87-a983-1e71e3543920","path":["SOURCE_PROD_BPM"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"f4af00a5-a536-4272-93cb-891ec13ef8e4","path":["SOURCE_SAP_MDS_STAGING"],"tag":"3","type":"CONTAINER","containerType":"SOURCE"},{"id":"7250d605-75a9-4ef2-a01b-55c2bcb44dd9","path":["SOURCE_TRAX_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"38a8293e-72f4-42c2-be66-667b21a1ac55","path":["SOURCE_KYLO_HIVE2"],"tag":"10","type":"CONTAINER","containerType":"SOURCE"},{"id":"95cb9f2f-3421-451a-8635-bb8487dc1872","path":["dwlprd1"],"tag":"7","type":"CONTAINER","containerType":"SOURCE"},{"id":"ac9334e4-daf2-4c6f-92f1-0452440fb737","path":["dwlprd2"],"tag":"5","type":"CONTAINER","containerType":"SOURCE"},{"id":"c27af9bd-075b-4fb8-bcd4-8450f26ff7f9","path":["SOURCE_ADLS_NIELSEN_DEPLETION_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"}]}

When I use the configuration(that specific JSONPath query) from above on a JSONPath online testing tool (see attached image), I get the expected result. But somehow nifi is returning empty array.

Template:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?><template encoding-version="1.2"><description></description><groupId>ae48862f-0165-1000-cc45-c1efcbb7ff08</groupId><name>dnu_jsonpath</name><snippet><connections><id>d84c0b8e-6983-3f0e-0000-000000000000</id><parentGroupId>5842a0b1-f01b-3160-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>5842a0b1-f01b-3160-0000-000000000000</groupId><id>7d993abd-1c1e-3cc5-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>5842a0b1-f01b-3160-0000-000000000000</groupId><id>509810d8-4798-30e5-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>02ff8ff3-ed1e-34b1-0000-000000000000</id><parentGroupId>5842a0b1-f01b-3160-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>5842a0b1-f01b-3160-0000-000000000000</groupId><id>8d45c558-a4a7-3529-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>failure</selectedRelationships><selectedRelationships>unmatched</selectedRelationships><source><groupId>5842a0b1-f01b-3160-0000-000000000000</groupId><id>7d993abd-1c1e-3cc5-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>6a3afe0c-951a-33fc-0000-000000000000</id><parentGroupId>5842a0b1-f01b-3160-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>5842a0b1-f01b-3160-0000-000000000000</groupId><id>ab89e6d1-f08e-32be-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>matched</selectedRelationships><source><groupId>5842a0b1-f01b-3160-0000-000000000000</groupId><id>7d993abd-1c1e-3cc5-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><processors><id>8d45c558-a4a7-3529-0000-000000000000</id><parentGroupId>5842a0b1-f01b-3160-0000-000000000000</parentGroupId><position><x>607.0</x><y>151.0</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>Log Level</key><value><name>Log Level</name></value></entry><entry><key>Log Payload</key><value><name>Log Payload</name></value></entry><entry><key>Attributes to Log</key><value><name>Attributes to Log</name></value></entry><entry><key>attributes-to-log-regex</key><value><name>attributes-to-log-regex</name></value></entry><entry><key>Attributes to Ignore</key><value><name>Attributes to Ignore</name></value></entry><entry><key>attributes-to-ignore-regex</key><value><name>attributes-to-ignore-regex</name></value></entry><entry><key>Log prefix</key><value><name>Log prefix</name></value></entry><entry><key>character-set</key><value><name>character-set</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Log Level</key><value>info</value></entry><entry><key>Log Payload</key><value>false</value></entry><entry><key>Attributes to Log</key></entry><entry><key>attributes-to-log-regex</key><value>.*</value></entry><entry><key>Attributes to Ignore</key></entry><entry><key>attributes-to-ignore-regex</key></entry><entry><key>Log prefix</key></entry><entry><key>character-set</key><value>UTF-8</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.LogAttribute</type></processors><processors><id>ab89e6d1-f08e-32be-0000-000000000000</id><parentGroupId>5842a0b1-f01b-3160-0000-000000000000</parentGroupId><position><x>715.0</x><y>468.99999999999994</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>Log Level</key><value><name>Log Level</name></value></entry><entry><key>Log Payload</key><value><name>Log Payload</name></value></entry><entry><key>Attributes to Log</key><value><name>Attributes to Log</name></value></entry><entry><key>attributes-to-log-regex</key><value><name>attributes-to-log-regex</name></value></entry><entry><key>Attributes to Ignore</key><value><name>Attributes to Ignore</name></value></entry><entry><key>attributes-to-ignore-regex</key><value><name>attributes-to-ignore-regex</name></value></entry><entry><key>Log prefix</key><value><name>Log prefix</name></value></entry><entry><key>character-set</key><value><name>character-set</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Log Level</key><value>info</value></entry><entry><key>Log Payload</key><value>false</value></entry><entry><key>Attributes to Log</key></entry><entry><key>attributes-to-log-regex</key><value>.*</value></entry><entry><key>Attributes to Ignore</key></entry><entry><key>attributes-to-ignore-regex</key></entry><entry><key>Log prefix</key></entry><entry><key>character-set</key><value>UTF-8</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.LogAttribute</type></processors><processors><id>509810d8-4798-30e5-0000-000000000000</id><parentGroupId>5842a0b1-f01b-3160-0000-000000000000</parentGroupId><position><x>0.0</x><y>0.0</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>File Size</key><value><name>File Size</name></value></entry><entry><key>Batch Size</key><value><name>Batch Size</name></value></entry><entry><key>Data Format</key><value><name>Data Format</name></value></entry><entry><key>Unique FlowFiles</key><value><name>Unique FlowFiles</name></value></entry><entry><key>generate-ff-custom-text</key><value><name>generate-ff-custom-text</name></value></entry><entry><key>character-set</key><value><name>character-set</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>File Size</key><value>0B</value></entry><entry><key>Batch Size</key><value>1</value></entry><entry><key>Data Format</key><value>Text</value></entry><entry><key>Unique FlowFiles</key><value>false</value></entry><entry><key>generate-ff-custom-text</key><value>{"data":[{"id":"dc18bf87-c5a6-4600-9584-e79fb988b1d0","path":["@Rakesh.Prasad@diageo.com"],"tag":"0","type":"CONTAINER","containerType":"HOME"},{"id":"42e52055-4deb-4d5d-942f-4e1c4e48c35e","path":["BPM"],"tag":"3","type":"CONTAINER","containerType":"SPACE"},{"id":"49e3d118-e4f9-41ef-ad97-6b2745c75c4f","path":["DATABRICKS_USAGE_REPORT"],"tag":"0","type":"CONTAINER","containerType":"SPACE"},{"id":"613f52e9-64df-4c9c-b083-c282f349eb4e","path":["LIGHTHOUSE"],"tag":"3","type":"CONTAINER","containerType":"SPACE"},{"id":"f57bcd83-4d0e-481e-b880-0fb8b20798a1","path":["MDM"],"tag":"2","type":"CONTAINER","containerType":"SPACE"},{"id":"745cd2d5-7303-4c0a-9cab-f5205b9eec90","path":["NIELSEN"],"tag":"2","type":"CONTAINER","containerType":"SPACE"},{"id":"b40da338-c429-4bb3-b2ef-51295a143fc8","path":["PowerBI"],"tag":"0","type":"CONTAINER","containerType":"SPACE"},{"id":"dffd025c-b0f0-4b9b-9060-da4aa54204d1","path":["REFERENCE_DATA"],"tag":"1","type":"CONTAINER","containerType":"SPACE"},{"id":"14f9759a-2059-4728-acad-fe01f129f148","path":["SAP_ODP_MASTERDATA"],"tag":"1","type":"CONTAINER","containerType":"SPACE"},{"id":"063bb5e8-041a-4f69-98a3-d2509d5e89d0","path":["TRAX"],"tag":"1","type":"CONTAINER","containerType":"SPACE"},{"id":"9c737147-6632-4328-bf10-ba4959a2806f","path":["TRAX_API"],"tag":"0","type":"CONTAINER","containerType":"SPACE"},{"id":"99167858-17ca-406f-b887-62af3d0da68a","path":["DEPLETION"],"tag":"1","type":"CONTAINER","containerType":"SPACE"},{"id":"52f17de1-a66e-4f08-9077-04acf3914663","path":["SOURCE_ADLS_NIELSEN_PROD"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"bea0de9c-b579-46bd-89ff-4b9497c3910e","path":["SOURCE_KYLO_DATALAKE"],"tag":"5","type":"CONTAINER","containerType":"SOURCE"},{"id":"20985e83-cd31-469e-9a17-1e586bccfb27","path":["SOURCE_LIGHTHOUSE_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"47406901-c9ce-4fce-b0ab-37b07338949b","path":["SOURCE_MDM_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"f1feff7d-8ada-46bb-a5fe-0283a2c746b3","path":["SOURCE_MDS_UAT"],"tag":"0","type":"CONTAINER","containerType":"SOURCE"},{"id":"48a5d1b6-8d32-449d-a317-d242f2394e71","path":["SOURCE_NIELSEN_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"32eaeeb5-60d5-4d87-a983-1e71e3543920","path":["SOURCE_PROD_BPM"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"f4af00a5-a536-4272-93cb-891ec13ef8e4","path":["SOURCE_SAP_MDS_STAGING"],"tag":"3","type":"CONTAINER","containerType":"SOURCE"},{"id":"7250d605-75a9-4ef2-a01b-55c2bcb44dd9","path":["SOURCE_TRAX_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"38a8293e-72f4-42c2-be66-667b21a1ac55","path":["SOURCE_KYLO_HIVE2"],"tag":"10","type":"CONTAINER","containerType":"SOURCE"},{"id":"95cb9f2f-3421-451a-8635-bb8487dc1872","path":["dwlprd1"],"tag":"7","type":"CONTAINER","containerType":"SOURCE"},{"id":"ac9334e4-daf2-4c6f-92f1-0452440fb737","path":["dwlprd2"],"tag":"5","type":"CONTAINER","containerType":"SOURCE"},{"id":"c27af9bd-075b-4fb8-bcd4-8450f26ff7f9","path":["SOURCE_ADLS_NIELSEN_DEPLETION_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"}]}</value></entry><entry><key>character-set</key><value>UTF-8</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>1 day</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>GenerateFlowFile</name><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.GenerateFlowFile</type></processors><processors><id>7d993abd-1c1e-3cc5-0000-000000000000</id><parentGroupId>5842a0b1-f01b-3160-0000-000000000000</parentGroupId><position><x>107.0</x><y>256.0</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>Destination</key><value><name>Destination</name></value></entry><entry><key>Return Type</key><value><name>Return Type</name></value></entry><entry><key>Path Not Found Behavior</key><value><name>Path Not Found Behavior</name></value></entry><entry><key>Null Value Representation</key><value><name>Null Value Representation</name></value></entry><entry><key>dataset</key><value><name>dataset</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Destination</key><value>flowfile-content</value></entry><entry><key>Return Type</key><value>auto-detect</value></entry><entry><key>Path Not Found Behavior</key><value>warn</value></entry><entry><key>Null Value Representation</key><value>empty string</value></entry><entry><key>dataset</key><value>$.data[?(@.containerType == "SOURCE" &amp;&amp; @.path == "SOURCE_KYLO_DATALAKE")].id</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>EvaluateJsonPath</name><relationships><autoTerminate>false</autoTerminate><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><name>matched</name></relationships><relationships><autoTerminate>false</autoTerminate><name>unmatched</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.EvaluateJsonPath</type></processors></snippet><timestamp>09/24/2018 06:03:42 EDT</timestamp></template>

回答1:

As you are searching for value in path array, Enclose SOURCE_KYLO_DATALAKE in [](array) then processor will only result the matching id value as output content.

Change the Eval JsonPath property value as below

dataset

$.data[?(@.containerType == 'SOURCE' && @.path == ['SOURCE_KYLO_DATALAKE'])].id

Configs:

Output Flowfile Content:

["bea0de9c-b579-46bd-89ff-4b9497c3910e"]

UPDATE:

I have used NiFi-1.7.1 and EvaluateJson expression works fine in this version.

However if you are using other versions of NiFi then

1.if you are having only one element in array then use below expression in your EvaluateJsonPath processor.

dataset

$.data[?(@.containerType == 'SOURCE'  && @.path[0] == 'SOURCE_KYLO_DATALAKE')].id

2.If you are having more than one element in array then

Flow:

Flow Description:

1.SplitJson

  • to split data array into individual messages
  • configure JsonPathExpression to $.data

2.EvaluateJsonPath

  • To extract required content and keep as attribute to the flowfile Now we are having id,containerType,path values as attributes to the flowfile

3.RouteOnAttribute:

  • in this processor we are checking the attribute values using NiFi expression language

  • Add new property as

    required

    ${containerType:equals("SOURCE"):and(${anyDelineatedValue("${path:replace('[',''):replace(']','')}",","):equals('"SOURCE_KYLO_DATALAKE"')})}

Feed the required relationship to ReplaceText processor

4.ReplaceText

  • Now we are replacing the id values to the flowfile content

Now we are going to have the id value in your output flowfile content from ReplaceText processor.

If possible upgrade the NiFi version to 1.7.1 then you don't need to do all these workarounds :)