Zero flow files in onTrigger() method of AbstractP

2019-08-21 10:43发布

问题:

I am developing a custom processor for Apache NiFi. I have created nar of my processor and put it in the lib folder of nifi and started the nifi. I have setup the remote debugger in eclipse and enabled breakpoint on first line of onTrigger(). While debugging I am running one processor at a time in my nifi pipeline. I can find single flow file in the input queue of my custom processor, however my custom processor is not receiving any flow file. When I start my custom processor, it hits breakpoint inside onTrigger() method. Inside thie method, when I do:

public class MyCustomProc extends AbstractProcessor {

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

        List<FlowFile> flowFiles = session.get(5000);
        if (flowFiles == null || flowFiles.size() == 0) {
            return;
        }
        //...

flowFiles turns out to be of size zero!!! I am not able to guess in which direction should I check to find why this is happening. Any hint how I can diagnose this?

Edit

Stacktrace

2019-05-02 18:08:09,456 ERROR [Timer-Driven Process Thread-10] c.c.product.module.submodule.MyCustomProcessor MyCustomProcessor[id=016a1008-8956-1dbf-bd66-993e0ce98668] MyCustomProcessor[id=016a1008-8956-1dbf-bd66-993e0ce98668] failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=408fbb3d-7cc2-48bc-be8f-6d0afdbddaf2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1556800468726-1, container=default, section=1], offset=261, length=591447],offset=0,name=188149730353200,size=591447] transfer relationship not specified; rolling back session: {}
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=408fbb3d-7cc2-48bc-be8f-6d0afdbddaf2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1556800468726-1, container=default, section=1], offset=261, length=591447],offset=0,name=188149730353200,size=591447] transfer relationship not specified
    at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:251)
    at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:321)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

PS1: This method returns immediately from inside if's body, which gives me following exception:

org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord transfer relationship not specified

This exceptions keeps recurring forever, since flow file in the input queue of my custom processor.

PS2: I am getting following error in the apps.log, though I am unsure if this is the source of the problem:

2019-05-02 18:17:32,394 ERROR [Timer-Driven Process Thread-4] o.a.nifi.groups.StandardProcessGroup Unable to synchronize StandardProcessGroup[identifier=d25747e6-719e-3ed9-c6c5-56794af6555c] with Flow Registry because Process Group was placed under Version Control using Flow Registry with identifier 80016ab0-bfab-152b-ffff-ffffc441867c but cannot find any Flow Registry with this identifier

回答1:

It is normal behavior to sometimes get zero flow files, which is why processors have the check that you have at the beginning.

The FlowFileHandlingException means that a flow file was obtained from the session, either from get or create, and that flow file was not transferred anywhere and was not removed, so basically it is unaccounted for. This could not happen from just returning at the beginning in that if statement, so the rest of the processor code is executing and producing this error. You haven't provided the rest of the code so we can't see the problem.

The second issue is fairly self-explanatory. You have a process group under version control, but the registry client that was used to start version control somehow no longer exists. I don't know how you created this scenario because I believe the UI/API won't let you delete a registry client that has active flows under version control, but you should be able to stop version control on the process group.