Nifi processor is not parsing JSON correctly

2019-06-06 17:23发布

I am using EvaluateJsonPath to extract one particular value from JSON. I am using the follwoing JSONPath expression:

 $.data[?(@.containerType == 'SOURCE' && @.path == 'SOURCE_KYLO_DATALAKE')].id

This is the JSON document I'm calling the JSONPath on :

{"data":[{"id":"dc18bf87-c5a6-4600-9584-e79fb988b1d0","path":["@Rakesh.Prasad@diageo.com"],"tag":"0","type":"CONTAINER","containerType":"HOME"},{"id":"42e52055-4deb-4d5d-942f-4e1c4e48c35e","path":["BPM"],"tag":"3","type":"CONTAINER","containerType":"SPACE"},{"id":"49e3d118-e4f9-41ef-ad97-6b2745c75c4f","path":["DATABRICKS_USAGE_REPORT"],"tag":"0","type":"CONTAINER","containerType":"SPACE"},{"id":"613f52e9-64df-4c9c-b083-c282f349eb4e","path":["LIGHTHOUSE"],"tag":"3","type":"CONTAINER","containerType":"SPACE"},{"id":"f57bcd83-4d0e-481e-b880-0fb8b20798a1","path":["MDM"],"tag":"2","type":"CONTAINER","containerType":"SPACE"},{"id":"745cd2d5-7303-4c0a-9cab-f5205b9eec90","path":["NIELSEN"],"tag":"2","type":"CONTAINER","containerType":"SPACE"},{"id":"b40da338-c429-4bb3-b2ef-51295a143fc8","path":["PowerBI"],"tag":"0","type":"CONTAINER","containerType":"SPACE"},{"id":"dffd025c-b0f0-4b9b-9060-da4aa54204d1","path":["REFERENCE_DATA"],"tag":"1","type":"CONTAINER","containerType":"SPACE"},{"id":"14f9759a-2059-4728-acad-fe01f129f148","path":["SAP_ODP_MASTERDATA"],"tag":"1","type":"CONTAINER","containerType":"SPACE"},{"id":"063bb5e8-041a-4f69-98a3-d2509d5e89d0","path":["TRAX"],"tag":"1","type":"CONTAINER","containerType":"SPACE"},{"id":"9c737147-6632-4328-bf10-ba4959a2806f","path":["TRAX_API"],"tag":"0","type":"CONTAINER","containerType":"SPACE"},{"id":"99167858-17ca-406f-b887-62af3d0da68a","path":["DEPLETION"],"tag":"1","type":"CONTAINER","containerType":"SPACE"},{"id":"52f17de1-a66e-4f08-9077-04acf3914663","path":["SOURCE_ADLS_NIELSEN_PROD"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"bea0de9c-b579-46bd-89ff-4b9497c3910e","path":["SOURCE_KYLO_DATALAKE"],"tag":"5","type":"CONTAINER","containerType":"SOURCE"},{"id":"20985e83-cd31-469e-9a17-1e586bccfb27","path":["SOURCE_LIGHTHOUSE_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"47406901-c9ce-4fce-b0ab-37b07338949b","path":["SOURCE_MDM_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"f1feff7d-8ada-46bb-a5fe-0283a2c746b3","path":["SOURCE_MDS_UAT"],"tag":"0","type":"CONTAINER","containerType":"SOURCE"},{"id":"48a5d1b6-8d32-449d-a317-d242f2394e71","path":["SOURCE_NIELSEN_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"32eaeeb5-60d5-4d87-a983-1e71e3543920","path":["SOURCE_PROD_BPM"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"f4af00a5-a536-4272-93cb-891ec13ef8e4","path":["SOURCE_SAP_MDS_STAGING"],"tag":"3","type":"CONTAINER","containerType":"SOURCE"},{"id":"7250d605-75a9-4ef2-a01b-55c2bcb44dd9","path":["SOURCE_TRAX_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"38a8293e-72f4-42c2-be66-667b21a1ac55","path":["SOURCE_KYLO_HIVE2"],"tag":"10","type":"CONTAINER","containerType":"SOURCE"},{"id":"95cb9f2f-3421-451a-8635-bb8487dc1872","path":["dwlprd1"],"tag":"7","type":"CONTAINER","containerType":"SOURCE"},{"id":"ac9334e4-daf2-4c6f-92f1-0452440fb737","path":["dwlprd2"],"tag":"5","type":"CONTAINER","containerType":"SOURCE"},{"id":"c27af9bd-075b-4fb8-bcd4-8450f26ff7f9","path":["SOURCE_ADLS_NIELSEN_DEPLETION_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"}]}

When I use the configuration(that specific JSONPath query) from above on a JSONPath online testing tool (see attached image), I get the expected result. But somehow nifi is returning empty array.

enter image description here

Template:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?><template encoding-version="1.2"><description></description><groupId>ae48862f-0165-1000-cc45-c1efcbb7ff08</groupId><name>dnu_jsonpath</name><snippet><connections><id>d84c0b8e-6983-3f0e-0000-000000000000</id><parentGroupId>5842a0b1-f01b-3160-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>5842a0b1-f01b-3160-0000-000000000000</groupId><id>7d993abd-1c1e-3cc5-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>5842a0b1-f01b-3160-0000-000000000000</groupId><id>509810d8-4798-30e5-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>02ff8ff3-ed1e-34b1-0000-000000000000</id><parentGroupId>5842a0b1-f01b-3160-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>5842a0b1-f01b-3160-0000-000000000000</groupId><id>8d45c558-a4a7-3529-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>failure</selectedRelationships><selectedRelationships>unmatched</selectedRelationships><source><groupId>5842a0b1-f01b-3160-0000-000000000000</groupId><id>7d993abd-1c1e-3cc5-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>6a3afe0c-951a-33fc-0000-000000000000</id><parentGroupId>5842a0b1-f01b-3160-0000-000000000000</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>5842a0b1-f01b-3160-0000-000000000000</groupId><id>ab89e6d1-f08e-32be-0000-000000000000</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>matched</selectedRelationships><source><groupId>5842a0b1-f01b-3160-0000-000000000000</groupId><id>7d993abd-1c1e-3cc5-0000-000000000000</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><processors><id>8d45c558-a4a7-3529-0000-000000000000</id><parentGroupId>5842a0b1-f01b-3160-0000-000000000000</parentGroupId><position><x>607.0</x><y>151.0</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>Log Level</key><value><name>Log Level</name></value></entry><entry><key>Log Payload</key><value><name>Log Payload</name></value></entry><entry><key>Attributes to Log</key><value><name>Attributes to Log</name></value></entry><entry><key>attributes-to-log-regex</key><value><name>attributes-to-log-regex</name></value></entry><entry><key>Attributes to Ignore</key><value><name>Attributes to Ignore</name></value></entry><entry><key>attributes-to-ignore-regex</key><value><name>attributes-to-ignore-regex</name></value></entry><entry><key>Log prefix</key><value><name>Log prefix</name></value></entry><entry><key>character-set</key><value><name>character-set</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Log Level</key><value>info</value></entry><entry><key>Log Payload</key><value>false</value></entry><entry><key>Attributes to Log</key></entry><entry><key>attributes-to-log-regex</key><value>.*</value></entry><entry><key>Attributes to Ignore</key></entry><entry><key>attributes-to-ignore-regex</key></entry><entry><key>Log prefix</key></entry><entry><key>character-set</key><value>UTF-8</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.LogAttribute</type></processors><processors><id>ab89e6d1-f08e-32be-0000-000000000000</id><parentGroupId>5842a0b1-f01b-3160-0000-000000000000</parentGroupId><position><x>715.0</x><y>468.99999999999994</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>Log Level</key><value><name>Log Level</name></value></entry><entry><key>Log Payload</key><value><name>Log Payload</name></value></entry><entry><key>Attributes to Log</key><value><name>Attributes to Log</name></value></entry><entry><key>attributes-to-log-regex</key><value><name>attributes-to-log-regex</name></value></entry><entry><key>Attributes to Ignore</key><value><name>Attributes to Ignore</name></value></entry><entry><key>attributes-to-ignore-regex</key><value><name>attributes-to-ignore-regex</name></value></entry><entry><key>Log prefix</key><value><name>Log prefix</name></value></entry><entry><key>character-set</key><value><name>character-set</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Log Level</key><value>info</value></entry><entry><key>Log Payload</key><value>false</value></entry><entry><key>Attributes to Log</key></entry><entry><key>attributes-to-log-regex</key><value>.*</value></entry><entry><key>Attributes to Ignore</key></entry><entry><key>attributes-to-ignore-regex</key></entry><entry><key>Log prefix</key></entry><entry><key>character-set</key><value>UTF-8</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.LogAttribute</type></processors><processors><id>509810d8-4798-30e5-0000-000000000000</id><parentGroupId>5842a0b1-f01b-3160-0000-000000000000</parentGroupId><position><x>0.0</x><y>0.0</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>File Size</key><value><name>File Size</name></value></entry><entry><key>Batch Size</key><value><name>Batch Size</name></value></entry><entry><key>Data Format</key><value><name>Data Format</name></value></entry><entry><key>Unique FlowFiles</key><value><name>Unique FlowFiles</name></value></entry><entry><key>generate-ff-custom-text</key><value><name>generate-ff-custom-text</name></value></entry><entry><key>character-set</key><value><name>character-set</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>File Size</key><value>0B</value></entry><entry><key>Batch Size</key><value>1</value></entry><entry><key>Data Format</key><value>Text</value></entry><entry><key>Unique FlowFiles</key><value>false</value></entry><entry><key>generate-ff-custom-text</key><value>{"data":[{"id":"dc18bf87-c5a6-4600-9584-e79fb988b1d0","path":["@Rakesh.Prasad@diageo.com"],"tag":"0","type":"CONTAINER","containerType":"HOME"},{"id":"42e52055-4deb-4d5d-942f-4e1c4e48c35e","path":["BPM"],"tag":"3","type":"CONTAINER","containerType":"SPACE"},{"id":"49e3d118-e4f9-41ef-ad97-6b2745c75c4f","path":["DATABRICKS_USAGE_REPORT"],"tag":"0","type":"CONTAINER","containerType":"SPACE"},{"id":"613f52e9-64df-4c9c-b083-c282f349eb4e","path":["LIGHTHOUSE"],"tag":"3","type":"CONTAINER","containerType":"SPACE"},{"id":"f57bcd83-4d0e-481e-b880-0fb8b20798a1","path":["MDM"],"tag":"2","type":"CONTAINER","containerType":"SPACE"},{"id":"745cd2d5-7303-4c0a-9cab-f5205b9eec90","path":["NIELSEN"],"tag":"2","type":"CONTAINER","containerType":"SPACE"},{"id":"b40da338-c429-4bb3-b2ef-51295a143fc8","path":["PowerBI"],"tag":"0","type":"CONTAINER","containerType":"SPACE"},{"id":"dffd025c-b0f0-4b9b-9060-da4aa54204d1","path":["REFERENCE_DATA"],"tag":"1","type":"CONTAINER","containerType":"SPACE"},{"id":"14f9759a-2059-4728-acad-fe01f129f148","path":["SAP_ODP_MASTERDATA"],"tag":"1","type":"CONTAINER","containerType":"SPACE"},{"id":"063bb5e8-041a-4f69-98a3-d2509d5e89d0","path":["TRAX"],"tag":"1","type":"CONTAINER","containerType":"SPACE"},{"id":"9c737147-6632-4328-bf10-ba4959a2806f","path":["TRAX_API"],"tag":"0","type":"CONTAINER","containerType":"SPACE"},{"id":"99167858-17ca-406f-b887-62af3d0da68a","path":["DEPLETION"],"tag":"1","type":"CONTAINER","containerType":"SPACE"},{"id":"52f17de1-a66e-4f08-9077-04acf3914663","path":["SOURCE_ADLS_NIELSEN_PROD"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"bea0de9c-b579-46bd-89ff-4b9497c3910e","path":["SOURCE_KYLO_DATALAKE"],"tag":"5","type":"CONTAINER","containerType":"SOURCE"},{"id":"20985e83-cd31-469e-9a17-1e586bccfb27","path":["SOURCE_LIGHTHOUSE_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"47406901-c9ce-4fce-b0ab-37b07338949b","path":["SOURCE_MDM_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"f1feff7d-8ada-46bb-a5fe-0283a2c746b3","path":["SOURCE_MDS_UAT"],"tag":"0","type":"CONTAINER","containerType":"SOURCE"},{"id":"48a5d1b6-8d32-449d-a317-d242f2394e71","path":["SOURCE_NIELSEN_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"32eaeeb5-60d5-4d87-a983-1e71e3543920","path":["SOURCE_PROD_BPM"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"f4af00a5-a536-4272-93cb-891ec13ef8e4","path":["SOURCE_SAP_MDS_STAGING"],"tag":"3","type":"CONTAINER","containerType":"SOURCE"},{"id":"7250d605-75a9-4ef2-a01b-55c2bcb44dd9","path":["SOURCE_TRAX_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"},{"id":"38a8293e-72f4-42c2-be66-667b21a1ac55","path":["SOURCE_KYLO_HIVE2"],"tag":"10","type":"CONTAINER","containerType":"SOURCE"},{"id":"95cb9f2f-3421-451a-8635-bb8487dc1872","path":["dwlprd1"],"tag":"7","type":"CONTAINER","containerType":"SOURCE"},{"id":"ac9334e4-daf2-4c6f-92f1-0452440fb737","path":["dwlprd2"],"tag":"5","type":"CONTAINER","containerType":"SOURCE"},{"id":"c27af9bd-075b-4fb8-bcd4-8450f26ff7f9","path":["SOURCE_ADLS_NIELSEN_DEPLETION_UAT"],"tag":"1","type":"CONTAINER","containerType":"SOURCE"}]}</value></entry><entry><key>character-set</key><value>UTF-8</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>1 day</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>GenerateFlowFile</name><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.GenerateFlowFile</type></processors><processors><id>7d993abd-1c1e-3cc5-0000-000000000000</id><parentGroupId>5842a0b1-f01b-3160-0000-000000000000</parentGroupId><position><x>107.0</x><y>256.0</y></position><bundle><artifact>nifi-standard-nar</artifact><group>org.apache.nifi</group><version>1.6.0</version></bundle><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><descriptors><entry><key>Destination</key><value><name>Destination</name></value></entry><entry><key>Return Type</key><value><name>Return Type</name></value></entry><entry><key>Path Not Found Behavior</key><value><name>Path Not Found Behavior</name></value></entry><entry><key>Null Value Representation</key><value><name>Null Value Representation</name></value></entry><entry><key>dataset</key><value><name>dataset</name></value></entry></descriptors><executionNode>ALL</executionNode><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Destination</key><value>flowfile-content</value></entry><entry><key>Return Type</key><value>auto-detect</value></entry><entry><key>Path Not Found Behavior</key><value>warn</value></entry><entry><key>Null Value Representation</key><value>empty string</value></entry><entry><key>dataset</key><value>$.data[?(@.containerType == "SOURCE" &amp;&amp; @.path == "SOURCE_KYLO_DATALAKE")].id</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>EvaluateJsonPath</name><relationships><autoTerminate>false</autoTerminate><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><name>matched</name></relationships><relationships><autoTerminate>false</autoTerminate><name>unmatched</name></relationships><state>STOPPED</state><style/><type>org.apache.nifi.processors.standard.EvaluateJsonPath</type></processors></snippet><timestamp>09/24/2018 06:03:42 EDT</timestamp></template>

1条回答
欢心
2楼-- · 2019-06-06 17:32

As you are searching for value in path array, Enclose SOURCE_KYLO_DATALAKE in [](array) then processor will only result the matching id value as output content.

Change the Eval JsonPath property value as below

dataset

$.data[?(@.containerType == 'SOURCE' && @.path == ['SOURCE_KYLO_DATALAKE'])].id

Configs: enter image description here

Output Flowfile Content:

["bea0de9c-b579-46bd-89ff-4b9497c3910e"]

UPDATE:

I have used NiFi-1.7.1 and EvaluateJson expression works fine in this version.

However if you are using other versions of NiFi then

1.if you are having only one element in array then use below expression in your EvaluateJsonPath processor.

dataset

$.data[?(@.containerType == 'SOURCE'  && @.path[0] == 'SOURCE_KYLO_DATALAKE')].id

2.If you are having more than one element in array then

Flow:

enter image description here

Flow Description:

1.SplitJson

  • to split data array into individual messages
  • configure JsonPathExpression to $.data

2.EvaluateJsonPath

  • To extract required content and keep as attribute to the flowfile enter image description here Now we are having id,containerType,path values as attributes to the flowfile

3.RouteOnAttribute:

  • in this processor we are checking the attribute values using NiFi expression language

  • Add new property as

    required

    ${containerType:equals("SOURCE"):and(${anyDelineatedValue("${path:replace('[',''):replace(']','')}",","):equals('"SOURCE_KYLO_DATALAKE"')})}

enter image description here

Feed the required relationship to ReplaceText processor

4.ReplaceText

  • Now we are replacing the id values to the flowfile content

enter image description here

Now we are going to have the id value in your output flowfile content from ReplaceText processor.

If possible upgrade the NiFi version to 1.7.1 then you don't need to do all these workarounds :)

查看更多
登录 后发表回答