Nifi- Parallel and concurrent execution with Execu

2019-06-27 12:17发布

问题:

Currently, I have Nifi running on an edge node that has 4 cores. Say I have 20 incoming flow files and I give concurrent tasks as 10 for ExecuteStreamCommand processor, does it mean I get only concurrent execution or both concurrent and parallel execution?

回答1:

In this case you get concurrency and parallelism, as noted in the Apache NiFi User Guide (emphasis added):

Next, the Scheduling Tab provides a configuration option named Concurrent tasks. This controls how many threads the Processor will use. Said a different way, this controls how many FlowFiles should be processed by this Processor at the same time. Increasing this value will typically allow the Processor to handle more data in the same amount of time. However, it does this by using system resources that then are not usable by other Processors. This essentially provides a relative weighting of Processors — it controls how much of the system’s resources should be allocated to this Processor instead of other Processors. This field is available for most Processors. There are, however, some types of Processors that can only be scheduled with a single Concurrent task.

If there are locking issues or race conditions with the command you are invoking, this could be problematic, but if they are independent, you are only limited by JVM scheduling and hardware performance.

Response to question in comments too long for a comment:

Question:

Thanks Andy. When there are 4 cores, can i assume that there shall be 4 parallel executions within which they would be running multiple threads to handle 10 concurrent tasks? In the best possible way, how are these 20 flowfiles executed in the scenario I mentioned. – John 30 mins ago

Response:

John, JVM thread handling is a fairly complex topic, but yes, in general there would be n+C JVM threads, where C is some constant (main thread, VM thread, GC threads) and n is a number of "individual" threads created by the flow controller to execute the processor tasks. JVM threads map 1:1 to native OS threads, so on a 4 core system with 10 processor threads running, you would have "4 parallel executions". My belief is that at a high level, your OS would use time slicing to cycle through the 10 threads 4 at a time, and each thread would process ~2 flowfiles.

Again, very rough idea (assume 1 flowfile = 1 unit of work = 1 second):

Cores | Threads | Flowfiles/thread | Relative time
  1   |    1    |         20       |      20 s      (normal)
  4   |    1    |         20       |      20 s      (wasting 3 cores)
  1   |    4    |          5       |      20 s      (time slicing 1 core for 4 threads)
  4   |    4    |          5       |       5 s      (1:1 thread to core ratio)
  4   |   10    |          2       |       5+x s    (see execution table below)

If we are assuming each core can handle one thread, and each thread can handle 1 flowfile per second, and each thread gets 1 second of uninterrupted operation (obviously not real), the execution sequence might look like this:

Flowfiles A - T

Cores α, β, γ, δ

Threads 1 - 10

Time/thread 1 s

Time | Core α | Core β | Core γ | Core δ
  0  |   1/A  |   2/B  |   3/C  |   4/D
  1  |   5/E  |   6/F  |   7/G  |   8/H
  2  |   9/I  |  10/J  |   1/K  |   2/L
  3  |   3/M  |   4/N  |   5/O  |   6/P
  4  |   7/Q  |   8/R  |   9/S  |  10/T

In 5 seconds, all 10 threads have executed twice, each completing 2 flowfiles.

However, assume the thread scheduler only assigns each thread a cycle of .5 seconds each iteration (again, not a realistic number, just to demonstrate). The execution pattern then would be:

Flowfiles A - T

Cores α, β, γ, δ

Threads 1 - 10

Time/thread .5 s

Time | Core α | Core β | Core γ | Core δ
  0  |   1/A  |   2/B  |   3/C  |   4/D
 .5  |   5/E  |   6/F  |   7/G  |   8/H
  1  |   9/I  |  10/J  |   1/A  |   2/B
1.5  |   3/C  |   4/D  |   5/E  |   6/F
  2  |   7/G  |   8/H  |   9/I  |  10/J
2.5  |   1/K  |   2/L  |   3/M  |   4/N
  3  |   5/O  |   6/P  |   7/Q  |   8/R
3.5  |   9/S  |  10/T  |   1/K  |   2/L
  4  |   3/M  |   4/N  |   5/O  |   6/P
4.5  |   7/Q  |   8/R  |   9/S  |  10/T

In this case, the total execution time is the same (* there is some overhead from the thread switching) but specific flowfiles take "longer" (total time from 0, not active execution time) to complete. For example, flowfiles C and D are not complete until time=2 in the second scenario, but are complete at time=1 in the first.

To be honest, the OS and JVM have people much smarter than me working on this, as does our project (luckily), so there are gross over-simplifications here and in general I would recommend you let the system worry about hyper-optimizing the threading. I would not think setting the concurrent tasks to 10 would yield vast improvements over setting it to 4 in this case. You can read more about JVM threading here and here.

I just did a quick test in my local 1.5.0 development branch -- I connected a simple GenerateFlowFile running with 0 sec schedule to a LogAttribute processor. The GenerateFlowFile immediately generates so many flowfiles that the queue enables the back pressure feature (pausing the input processor until the queue can drain some of the 10,000 waiting flowfiles). I stopped both and re-ran this, giving the LogAttribute processor more concurrent tasks. By setting the LogAttribute concurrent tasks to 2:1 of the GenerateFlowFile, the queue never built up past about 50 queued flowfiles.

tl;dr Setting your concurrent tasks to the number of cores you have should be sufficient.

Update 2:

Checked with one of our resident JVM experts and he mentioned two things to note:

  1. The command is not solely CPU limited; if I/O is heavy, more concurrent tasks may be beneficial.
  2. The max number of concurrent tasks for the entire flow controller is set to 10 by default.