Why is communication via shared memory so much slo

2019-04-23 23:48发布

I am using Python 2.7.5 on a recent vintage Apple MacBook Pro which has four hardware and eight logical CPUs; i.e., the sysctl utility gives:

$ sysctl hw.physicalcpu
hw.physicalcpu: 4
$ sysctl hw.logicalcpu
hw.logicalcpu: 8

I need to perform some rather complicated processing on a large 1-D list or array, and then save the result as an intermediate output which will be used again at a later point in a subsequent calculation within my application. The structure of my problem lends itself rather naturally to parallelization, so I thought that I would try to use Python's multiprocessing module to subdivide the 1D array into several pieces (either 4 pieces or 8 pieces, I'm not yet sure which), perform the calculations in parallel, and then reassemble the resulting output into its final format afterwards. I am trying to decide whether to use multiprocessing.Queue() (message queues) or multiprocessing.Array() (shared memory) as my preferred mechanism for communicating the resulting calculations from the child processes back up to the main parent process, and I have been experimenting with a couple of "toy" models in order to make sure that I understand how the multiprocessing module actually works. I've come across a rather unexpected result, however: in creating two essentially equivalent solutions to the same problem, the version which uses shared memory for interprocess communication seems to require much more execution time (like 30X more!) than the version using message queues. Below, I've included two different versions of sample source code for a "toy" problem which generates a long sequence of random numbers using parallel processes, and communicates the agglomerated result back to a parent process in two different ways: first using message queues, and the second time using shared memory.

Here is the version that uses message queues:

import random
import multiprocessing
import datetime

def genRandom(count, id, q):

    print("Now starting process {0}".format(id))
    output = []
    # Generate a list of random numbers, of length "count"
    for i in xrange(count):
        output.append(random.random())
    # Write the output to a queue, to be read by the calling process 
    q.put(output)

if __name__ == "__main__":
    # Number of random numbers to be generated by each process
    size = 1000000
    # Number of processes to create -- the total size of all of the random
    # numbers generated will ultimately be (procs * size)
    procs = 4

    # Create a list of jobs and queues 
    jobs = []
    outqs = []
    for i in xrange(0, procs):
        q = multiprocessing.Queue()
        p = multiprocessing.Process(target=genRandom, args=(size, i, q))
        jobs.append(p)
        outqs.append(q)

    # Start time of the parallel processing and communications section
    tstart = datetime.datetime.now()    
    # Start the processes (i.e. calculate the random number lists)      
    for j in jobs:
        j.start()

    # Read out the data from the queues
    data = []
    for q in outqs:
        data.extend(q.get())

    # Ensure all of the processes have finished
    for j in jobs:
        j.join()
    # End time of the parallel processing and communications section
    tstop = datetime.datetime.now()
    tdelta = datetime.timedelta.total_seconds(tstop - tstart)

    msg = "{0} random numbers generated in {1} seconds"
    print(msg.format(len(data), tdelta))

When I run it, I get a result that typically looks about like this:

$ python multiproc_queue.py
Now starting process 0
Now starting process 1
Now starting process 2
Now starting process 3
4000000 random numbers generated in 0.514805 seconds

Now, here is the equivalent code segment, but refactored just slightly so that it uses shared memory instead of queues:

import random
import multiprocessing
import datetime

def genRandom(count, id, d):

    print("Now starting process {0}".format(id))
    # Generate a list of random numbers, of length "count", and write them
    # directly to a segment of an array in shared memory
    for i in xrange(count*id, count*(id+1)):
        d[i] = random.random()

if __name__ == "__main__":
    # Number of random numbers to be generated by each process
    size = 1000000
    # Number of processes to create -- the total size of all of the random
    # numbers generated will ultimately be (procs * size)
    procs = 4

    # Create a list of jobs and a block of shared memory
    jobs = []
    data = multiprocessing.Array('d', size*procs)
    for i in xrange(0, procs):
        p = multiprocessing.Process(target=genRandom, args=(size, i, data))
        jobs.append(p)

    # Start time of the parallel processing and communications section
    tstart = datetime.datetime.now()    
    # Start the processes (i.e. calculate the random number lists)      
    for j in jobs:
        j.start()

    # Ensure all of the processes have finished
    for j in jobs:
    j.join()
    # End time of the parallel processing and communications section
    tstop = datetime.datetime.now()
    tdelta = datetime.timedelta.total_seconds(tstop - tstart)

    msg = "{0} random numbers generated in {1} seconds"
    print(msg.format(len(data), tdelta))

When I run the shared memory version, however, the typical result looks more like this:

$ python multiproc_shmem.py 
Now starting process 0
Now starting process 1
Now starting process 2
Now starting process 3
4000000 random numbers generated in 15.839607 seconds

My question: why is there such a huge difference in execution speeds (roughly 0.5 seconds vs. 15 seconds, a factor of 30X!) between the two versions of my code? And in particular, how can I modify the shared memory version in order to get it to run faster?

1条回答
该账号已被封号
2楼-- · 2019-04-24 00:25

This is because multiprocessing.Array uses a lock by default to prevent multiple processes from accessing it at once:

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

...

If lock is True (the default) then a new lock object is created to synchronize access to the value. If lock is a Lock or RLock object then that will be used synchronize access to the value. If lock is False then access to the returned object will not be automatically protected by a lock, so it will not necessarily be “process-safe”.

This means you're not really concurrently writing to the array - only one process can access it at a time. Since your example workers are doing almost nothing but array writes, constantly waiting on this lock badly hurts performance. If you use lock=False when you create the array, the performance is much better:

With lock=True:

Now starting process 0
Now starting process 1
Now starting process 2
Now starting process 3
4000000 random numbers generated in 4.811205 seconds

With lock=False:

Now starting process 0
Now starting process 3
Now starting process 1
Now starting process 2
4000000 random numbers generated in 0.192473 seconds

Note that using lock=False means you need to manually protect access to the Array whenever you're doing something that isn't process-safe. Your example is having processes write to unique parts, so it's ok. But if you were trying to read from it while doing that, or had different processes write to overlapping parts, you would need to manually acquire a lock.

查看更多
登录 后发表回答