I was trying out the Python multiprocessing module. In the code below the serial execution time 0.09 seconds and the parallel execution time is 0.2 seconds. As I am getting no speedup, I think I might be going wrong somewhere
import multiprocessing as mp
from random import uniform, randrange
import time
# m = mp.Manager()
out_queue = mp.Queue()
def flop_no(rand_nos, a, b):
cals = []
for r in rand_nos:
cals.append(r + a * b)
return cals
def flop(val, a, b, out_queue):
cals = []
for v in val:
cals.append(v + a * b)
# print cals
out_queue.put(cals)
# print "Exec over"
def concurrency():
# out_queue1 = mp.Queue()
# out_queue2 = mp.Queue()
a = 3.3
b = 4.4
rand_nos = [uniform(1, 4) for i in range(1000000)]
print len(rand_nos)
# for i in range(5):
start_time = time.time()
p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue))
p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue))
p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue))
p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue))
p1.start()
out_queue.get()
# print "\nFinal:", len(out_queue.get())
p2.start()
out_queue.get()
# print "\nFinal:", len(out_queue.get())
p3.start()
out_queue.get()
p4.start()
out_queue.get()
p1.join()
p2.join()
p3.join()
p4.join()
print "Running time parallel: ", time.time() - start_time, "secs"
def no_concurrency():
a = 3.3
b = 4.4
rand_nos = [uniform(1, 4) for i in range(1000000)]
start_time = time.time()
cals = flop_no(rand_nos, a, b)
print "Running time serial: ", time.time() - start_time, "secs"
if __name__ == '__main__':
concurrency()
no_concurrency()
# print "Program over"
My system has four cores. Please let me know of ways I can speedup this code. Also, what are my options for parallel programming with python(other than the multiprocessing module).
Thanks and Regards
out_queue.get()
blocks until a result is available by default. So you are essentially starting a process and waiting for it to finish before starting the next process. Instead, start all the processes, then get all the results.
Example:
#!python2
import multiprocessing as mp
from random import uniform, randrange
import time
def flop_no(rand_nos, a, b):
cals = []
for r in rand_nos:
cals.append(r + a * b)
return cals
def flop(val, a, b, out_queue):
cals = []
for v in val:
cals.append(v + a * b)
out_queue.put(cals)
# time.sleep(3)
def concurrency():
out_queue = mp.Queue()
a = 3.3
b = 4.4
rand_nos = [uniform(1, 4) for i in range(1000000)]
print len(rand_nos)
# for i in range(5):
start_time = time.time()
p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue))
p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue))
p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue))
p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue))
p1.start()
p2.start()
p3.start()
p4.start()
print len(out_queue.get())
print len(out_queue.get())
print len(out_queue.get())
print len(out_queue.get())
p1.join()
p2.join()
p3.join()
p4.join()
print "Running time parallel: ", time.time() - start_time, "secs"
def no_concurrency():
a = 3.3
b = 4.4
rand_nos = [uniform(1, 4) for i in range(1000000)]
start_time = time.time()
cals = flop_no(rand_nos, a, b)
print "Running time serial: ", time.time() - start_time, "secs"
if __name__ == '__main__':
concurrency()
no_concurrency()
# print "Program over"
Output:
1000000
250000
250000
250000
250000
Running time parallel: 3.54999995232 secs
Running time serial: 0.203000068665 secs
Note that parallel time is still slower. This is due to the overhead of starting 4 other Python processes. Your processing time for the whole job is only .2 seconds. The 3.5 seconds for parallel is mostly just starting up the processes. Note the commented out `# time.sleep(3)` above in `flop()`. Add that code in and the times are:
1000000
250000
250000
250000
250000
Running time parallel: 6.50900006294 secs
Running time serial: 0.203000068665 secs
The overall time only got 3 seconds faster (not 12) because they were running in parallel. You need a lot more data to make parallel processing worthwhile.
Here's a version where you can visually see how long it takes to start the processes. "here" is printed as each process begins to run `flop()`. An event is used to start all threads at the same time, and only the processing time is counted:
#!python2
import multiprocessing as mp
from random import uniform, randrange
import time
def flop_no(rand_nos, a, b):
cals = []
for r in rand_nos:
cals.append(r + a * b)
return cals
def flop(val, a, b, out_queue, start):
print 'here'
start.wait()
cals = []
for v in val:
cals.append(v + a * b)
out_queue.put(cals)
time.sleep(3)
def concurrency():
out_queue = mp.Queue()
start = mp.Event()
a = 3.3
b = 4.4
rand_nos = [uniform(1, 4) for i in range(1000000)]
print len(rand_nos)
# for i in range(5):
p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue, start))
p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue, start))
p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue, start))
p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue, start))
p1.start()
p2.start()
p3.start()
p4.start()
time.sleep(5) # Wait for processes to start. See Barrier in Python 3.2+ for a better solution.
print "go"
start.set()
start_time = time.time()
print len(out_queue.get())
print len(out_queue.get())
print len(out_queue.get())
print len(out_queue.get())
print "Running time parallel: ", time.time() - start_time, "secs"
p1.join()
p2.join()
p3.join()
p4.join()
def no_concurrency():
a = 3.3
b = 4.4
rand_nos = [uniform(1, 4) for i in range(1000000)]
start_time = time.time()
cals = flop_no(rand_nos, a, b)
print "Running time serial: ", time.time() - start_time, "secs"
if __name__ == '__main__':
concurrency()
no_concurrency()
# print "Program over"
Output:
1000000
here # note these print about a second apart.
here
here
here
go
250000
250000
250000
250000
Running time parallel: 0.171999931335 secs
Running time serial: 0.203000068665 secs
Now, the processing time got faster. Not by a lot...probably due to the interprocess communication to get the results.
Love is a passion . . . but can hurt a lot, if one's belief is just blind or naive to evidence
I love python for its ease of use, for its universality, yet, getting towards HPC performance requires more, hardware-related insights and optimisation-tweaking efforts are needed to be also put in.
@RupjitChakraborty as you might enjoy in my answer below, the same result could be received in a pure-[SERIAL]
-code ~50x faster than in your best case and about ~100x faster than Mark's reported time. Feel free to re-test it on your hardware, so as to have a same platform for a bit more rigorous comparisons of performance readings. Nevertheless, enjoy the hunt for performance! – user3666197 Dec 1 '17 at 13:39
If I may put a few cents into this never-ending hunt for performance:
- try to well understand both the original Amdahl's Law + its new overhead-strict re-formulation
- try to well quantify the costs of add-on overheads that appear on process-management
- try to well quantify the costs of add-on overheads that relate to large data transfers ( one-stop cost )
- try to avoid any and all potential (b)locking, some might be hidden "behind" used constructors
- try to avoid any processing-unrelated overhead costs of synchronisation + communication
- try to prevent any CPU_core cache misses and also best minimise coherence losses ( yes, easy to say, hard to code - i.e. a manually crafted code often gets better than a simple one-liner, using some highly-abstracted syntax-constructor ( but at a cost one cannot manage ), as you can take better steps in cache-related decision under your control, than to rely on doing this by some context unaware pre-fabricated universal ( i.e. unrelated to your particular priorities ) code transformation )
Want speedup?
Always systematically test individual factors in isolation:
As a brief view into the actual costs your code will pay ( in [us]
) never guess, test it.
Test-case A: measures process-management [SERIAL]
-process-scheduling add-on costs
Test-case B: measures remote process memory allocation add-on costs
Test-case C: measures remote process [CONCURRENT]
-process-scheduling computing costs
Test-case D: measures remote process workloads impact on [CONCURRENT]
scheduling costs
For details,
one may read further and re-use / improve naive code templates
in chapter [ The Architecture, Resources and Process-scheduling facts that matter ].
As Mark has warned already, another costs to the overhead-strict Amdahl's Law speedup calculation will come from data-transfers from the main process towards each of the spawned subprocesses, where pure-[SERIAL]
add-on overheads will and do grow more than linearly scaled to data volume, due to colliding access patterns, resource physical-capacity contention, shared-objects signallisation-(b)locking-overheads, and similar, hardware un-avoidable obstacles.
Before going any deeper into performance-tweaking options, one may propose an easy Test-case E: for measuring this very class of memory-data-transfers add-on costs:
def a_FAT_DATA_XFER_COSTS_FUN( anIndeedFatPieceOfDATA ):
""" __doc__
The intent of this FUN() is indeed to do nothing at all,
but to be able to benchmark
add-on overhead costs
raised by a need to transfer
some large amount of data
from a main()-process
to this FUN()-subprocess spawned.
"""
return ( anIndeedFatPieceOfDATA[ 0]
+ anIndeedFatPieceOfDATA[-1]
)
##############################################################
### A NAIVE TEST BENCH
##############################################################
from zmq import Stopwatch; aClk = Stopwatch()
JOBS_TO_SPAWN = 4 # TUNE: 1, 2, 4, 5, 10, ..
RUNS_TO_RUN = 10 # TUNE: 10, 20, 50, 100, 200, 500, 1000, ..
SIZE_TO_XFER = 1E+6 # TUNE: +6, +7, +8, +9, +10, ..
DATA_TO_XFER = [ 1 for _ in range( int( SIZE_TO_XFER ) ) ]
try:
aClk.start()
#-----------------------------------------------------<_CODE_UNDER_TEST_>
joblib.Parallel( n_jobs = JOBS_TO_SPAWN
)( joblib.delayed( a_FAT_DATA_XFER_COSTS_FUN )
( a_FAT_DATA )
for ( a_FAT_DATA )
in [ DATA_TO_XFER
for _ in range( RUNS_TO_RUN )
]
)
#-----------------------------------------------------<_CODE_UNDER_TEST_>
except:
pass
finally:
try:
_ = aClk.stop()
except:
_ = -1
pass
template = "CLK:: {0:_>24d} [us] @{1: >3d} run{2: >5d} RUNS ( {3: >12.3f}[MB]"
print( template.format( _,
JOBS_TO_SPAWN,
RUNS_TO_RUN,
SIZE_TO_SEND / 1024. /1024.
)
)
Please let me know of ways I can speedup this code.
- learn about
numba
, definitely worth knowing this tool for performance boosting
- learn about vectorisation of operations
- after mastering these two, might look into re-formulating an already perfect code into Cython
rVEC = np.random.uniform( 1, 4, 1E+6 )
def flop_NaivePY( r, a, b ):
return( r+(a *b ) )
aClk.start(); _ = flop_NaivePY( rVEC, a, b ); aClk.stop()
4868L
4253L
4113L
4376L
4333L
4137L
4.~_____[ms] @ 1.000.000 FLOAT-OPS, COOL, RIGHT?
Yet, this code is awfully wrong if thinking about performance.
Let's turn on numpy
in-place assignments, avoiding duplicate memory allocations and similar processing-inefficiencies:
def flop_InplaceNUMPY( r, a, b ):
r += a * b
return r
aClk.start(); _ = flop_InplaceNUMPY( rVEC, a, b ); aClk.stop()
2459L
2426L
2658L
2444L
2421L
2430L
2429L
4.?? @ 1.000.000 FLOAT-OPS, COOL, RIGHT? NOT AS SEEN NOW
2.~!____[ms] @ 1.000.000 FLOAT-OPS, HALF, BETTER!
BUT
ALSO TEST THE SCALING
ONCE GONE OFF CACHE,
THAT TEST GET SMELL OF A NEED
TO OPTIMISE
CODE DESIGN
Cautious experimentators will soon exhibit that later might be seen even killed python-process during the naive-code runs, as insufficient memory allocation request will get suffocated and panicked to terminate on larger sizes above ~1E+9 )
this all will bring otherwise pure-[SERIAL]
code on steroids, yet without paying any but zero add-on costs and uncle Gene Amdahl will reward your process-scheduling and hardware-architecture knowledge and efforts spent during code-design on max.
No better advice exists . . . except going into a pure clairvoyance business, where re-testing is never available