Why python multiprocessing takes more time than se

2019-01-15 17:31发布

问题:

I was trying out the Python multiprocessing module. In the code below the serial execution time 0.09 seconds and the parallel execution time is 0.2 seconds. As I am getting no speedup, I think I might be going wrong somewhere

import multiprocessing as mp
from random import uniform, randrange
import time

# m = mp.Manager()
out_queue = mp.Queue()

def flop_no(rand_nos, a, b):
    cals = []
    for r in rand_nos:
        cals.append(r + a * b)
    return cals


def flop(val, a, b, out_queue):
    cals = []
    for v in val:
        cals.append(v + a * b)
    # print cals
    out_queue.put(cals)
    # print "Exec over"


def concurrency():
    # out_queue1 = mp.Queue()
    # out_queue2 = mp.Queue()
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    print len(rand_nos)
    # for i in range(5):
    start_time = time.time()
    p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue))
    p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue))
    p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue))
    p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue))
    p1.start()
    out_queue.get()
    # print "\nFinal:", len(out_queue.get())
    p2.start()
    out_queue.get()
    # print "\nFinal:", len(out_queue.get())
    p3.start()
    out_queue.get()

    p4.start()
    out_queue.get()

    p1.join()
    p2.join()
    p3.join()
    p4.join()

    print "Running time parallel: ", time.time() - start_time, "secs"

def no_concurrency():
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    start_time = time.time()
    cals = flop_no(rand_nos, a, b)
    print "Running time serial: ", time.time() - start_time, "secs"

if __name__ == '__main__':
    concurrency()
    no_concurrency()
    # print "Program over"

My system has four cores. Please let me know of ways I can speedup this code. Also, what are my options for parallel programming with python(other than the multiprocessing module).

Thanks and Regards

回答1:

out_queue.get() blocks until a result is available by default. So you are essentially starting a process and waiting for it to finish before starting the next process. Instead, start all the processes, then get all the results.

Example:

    #!python2
    import multiprocessing as mp
    from random import uniform, randrange
    import time

    def flop_no(rand_nos, a, b):
        cals = []
        for r in rand_nos:
            cals.append(r + a * b)
        return cals

    def flop(val, a, b, out_queue):
        cals = []
        for v in val:
            cals.append(v + a * b)
        out_queue.put(cals)
        # time.sleep(3)

    def concurrency():
        out_queue = mp.Queue()
        a = 3.3
        b = 4.4
        rand_nos = [uniform(1, 4) for i in range(1000000)]
        print len(rand_nos)
        # for i in range(5):
        start_time = time.time()
        p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue))
        p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue))
        p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue))
        p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue))

        p1.start()
        p2.start()
        p3.start()
        p4.start()

        print len(out_queue.get())
        print len(out_queue.get())
        print len(out_queue.get())
        print len(out_queue.get())

        p1.join()
        p2.join()
        p3.join()
        p4.join()

        print "Running time parallel: ", time.time() - start_time, "secs"

    def no_concurrency():
        a = 3.3
        b = 4.4
        rand_nos = [uniform(1, 4) for i in range(1000000)]
        start_time = time.time()
        cals = flop_no(rand_nos, a, b)
        print "Running time serial: ", time.time() - start_time, "secs"

    if __name__ == '__main__':
        concurrency()
        no_concurrency()
        # print "Program over" 

Output:

    1000000
    250000
    250000
    250000
    250000
    Running time parallel:  3.54999995232  secs
    Running time serial:    0.203000068665 secs

Note that parallel time is still slower.  This is due to the overhead of starting 4 other Python processes.  Your processing time for the whole job is only .2 seconds.  The 3.5 seconds for parallel is mostly just starting up the processes.  Note the commented out `# time.sleep(3)` above in `flop()`.  Add that code in and the times are:

    1000000
    250000
    250000
    250000
    250000
    Running time parallel:  6.50900006294  secs
    Running time serial:    0.203000068665 secs

The overall time only got 3 seconds faster (not 12) because they were running in parallel.  You need a lot more data to make parallel processing worthwhile.

Here's a version where you can visually see how long it takes to start the processes.  "here" is printed as each process begins to run `flop()`.  An event is used to start all threads at the same time, and only the processing time is counted:

#!python2

import multiprocessing as mp
from random import uniform, randrange
import time

def flop_no(rand_nos, a, b):
    cals = []
    for r in rand_nos:
        cals.append(r + a * b)
    return cals

def flop(val, a, b, out_queue, start):
    print 'here'
    start.wait()
    cals = []
    for v in val:
        cals.append(v + a * b)
    out_queue.put(cals)
    time.sleep(3)

def concurrency():
    out_queue = mp.Queue()
    start = mp.Event()
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    print len(rand_nos)
    # for i in range(5):
    p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue, start))
    p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue, start))
    p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue, start))
    p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue, start))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    time.sleep(5) # Wait for processes to start.  See Barrier in Python 3.2+ for a better solution.
    print "go"
    start.set()
    start_time = time.time()
    print len(out_queue.get())
    print len(out_queue.get())
    print len(out_queue.get())
    print len(out_queue.get())
    print "Running time parallel: ", time.time() - start_time, "secs"

    p1.join()
    p2.join()
    p3.join()
    p4.join()

def no_concurrency():
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    start_time = time.time()
    cals = flop_no(rand_nos, a, b)
    print "Running time serial: ", time.time() - start_time, "secs"

if __name__ == '__main__':
    concurrency()
    no_concurrency()
    # print "Program over"

Output:

1000000
here           # note these print about a second apart.
here
here
here
go
250000
250000
250000
250000
Running time parallel:  0.171999931335 secs
Running time serial:    0.203000068665 secs

Now, the processing time got faster. Not by a lot...probably due to the interprocess communication to get the results.



回答2:

Love is a passion . . . but can hurt a lot, if one's belief is just blind or naive to evidence

I love python for its ease of use, for its universality, yet, getting towards HPC performance requires more, hardware-related insights and optimisation-tweaking efforts are needed to be also put in.

@RupjitChakraborty as you might enjoy in my answer below, the same result could be received in a pure-[SERIAL]-code ~50x faster than in your best case and about ~100x faster than Mark's reported time. Feel free to re-test it on your hardware, so as to have a same platform for a bit more rigorous comparisons of performance readings. Nevertheless, enjoy the hunt for performance! – user3666197 Dec 1 '17 at 13:39

If I may put a few cents into this never-ending hunt for performance:
- try to well understand both the original Amdahl's Law + its new overhead-strict re-formulation
- try to well quantify the costs of add-on overheads that appear on process-management
- try to well quantify the costs of add-on overheads that relate to large data transfers ( one-stop cost )
- try to avoid any and all potential (b)locking, some might be hidden "behind" used constructors
- try to avoid any processing-unrelated overhead costs of synchronisation + communication
- try to prevent any CPU_core cache misses and also best minimise coherence losses ( yes, easy to say, hard to code - i.e. a manually crafted code often gets better than a simple one-liner, using some highly-abstracted syntax-constructor ( but at a cost one cannot manage ), as you can take better steps in cache-related decision under your control, than to rely on doing this by some context unaware pre-fabricated universal ( i.e. unrelated to your particular priorities ) code transformation )


Want speedup?
Always systematically test individual factors in isolation:

As a brief view into the actual costs your code will pay ( in [us] ) never guess, test it.

Test-case A: measures process-management [SERIAL]-process-scheduling add-on costs
Test-case B: measures remote process memory allocation add-on costs
Test-case C: measures remote process [CONCURRENT]-process-scheduling computing costs Test-case D: measures remote process workloads impact on [CONCURRENT] scheduling costs

For details,
one may read further and re-use / improve naive code templates
in chapter [ The Architecture, Resources and Process-scheduling facts that matter ].

As Mark has warned already, another costs to the overhead-strict Amdahl's Law speedup calculation will come from data-transfers from the main process towards each of the spawned subprocesses, where pure-[SERIAL] add-on overheads will and do grow more than linearly scaled to data volume, due to colliding access patterns, resource physical-capacity contention, shared-objects signallisation-(b)locking-overheads, and similar, hardware un-avoidable obstacles.

Before going any deeper into performance-tweaking options, one may propose an easy Test-case E: for measuring this very class of memory-data-transfers add-on costs:

def a_FAT_DATA_XFER_COSTS_FUN( anIndeedFatPieceOfDATA ):
    """                                                 __doc__
    The intent of this FUN() is indeed to do nothing at all,
                             but to be able to benchmark
                             add-on overhead costs
                             raised by a need to transfer
                             some large amount of data
                             from a main()-process
                             to this FUN()-subprocess spawned.
    """
    return ( anIndeedFatPieceOfDATA[ 0]
           + anIndeedFatPieceOfDATA[-1]
             )

##############################################################
###  A NAIVE TEST BENCH
##############################################################
from zmq import Stopwatch; aClk = Stopwatch()
JOBS_TO_SPAWN =  4         # TUNE:  1,  2,  4,   5,  10, ..
RUNS_TO_RUN   = 10         # TUNE: 10, 20, 50, 100, 200, 500, 1000, ..
SIZE_TO_XFER  = 1E+6       # TUNE: +6, +7, +8,  +9, +10, ..

DATA_TO_XFER  = [ 1 for _ in range( int( SIZE_TO_XFER ) ) ]

try:
     aClk.start()
     #-----------------------------------------------------<_CODE_UNDER_TEST_>
     joblib.Parallel(  n_jobs = JOBS_TO_SPAWN
                      )( joblib.delayed( a_FAT_DATA_XFER_COSTS_FUN )
                                       ( a_FAT_DATA )
                                   for ( a_FAT_DATA )
                                   in  [       DATA_TO_XFER
                                         for _ in range( RUNS_TO_RUN )
                                         ]
                         )
     #-----------------------------------------------------<_CODE_UNDER_TEST_>
except:
     pass
finally:
     try:
         _ = aClk.stop()
     except:
         _ = -1
         pass

template = "CLK:: {0:_>24d} [us] @{1: >3d} run{2: >5d} RUNS ( {3: >12.3f}[MB]"

print( template.format( _,
                        JOBS_TO_SPAWN,
                        RUNS_TO_RUN,
                        SIZE_TO_SEND / 1024. /1024.
                        )
       )

Please let me know of ways I can speedup this code.

  • learn about numba, definitely worth knowing this tool for performance boosting
  • learn about vectorisation of operations
  • after mastering these two, might look into re-formulating an already perfect code into Cython

rVEC = np.random.uniform( 1, 4, 1E+6 )

def flop_NaivePY( r, a, b ):
    return(       r+(a *b ) )

aClk.start(); _ = flop_NaivePY( rVEC, a, b ); aClk.stop()
4868L
4253L
4113L
4376L
4333L
4137L
4.~_____[ms] @ 1.000.000 FLOAT-OPS, COOL, RIGHT?

Yet, this code is awfully wrong if thinking about performance.

Let's turn on numpy in-place assignments, avoiding duplicate memory allocations and similar processing-inefficiencies:

def flop_InplaceNUMPY( r, a, b ):
       r += a * b
       return r

aClk.start(); _ = flop_InplaceNUMPY( rVEC, a, b ); aClk.stop()
2459L
2426L
2658L
2444L
2421L
2430L
2429L
4.??         @ 1.000.000 FLOAT-OPS, COOL, RIGHT? NOT AS SEEN NOW
2.~!____[ms] @ 1.000.000 FLOAT-OPS, HALF, BETTER!
                                          BUT
                                          ALSO TEST THE SCALING
                                          ONCE GONE OFF CACHE,
                                          THAT TEST GET SMELL OF A NEED
                                                              TO OPTIMISE
                                                              CODE DESIGN

Cautious experimentators will soon exhibit that later might be seen even killed python-process during the naive-code runs, as insufficient memory allocation request will get suffocated and panicked to terminate on larger sizes above ~1E+9 )

this all will bring otherwise pure-[SERIAL] code on steroids, yet without paying any but zero add-on costs and uncle Gene Amdahl will reward your process-scheduling and hardware-architecture knowledge and efforts spent during code-design on max.

No better advice exists . . . except going into a pure clairvoyance business, where re-testing is never available