Better way to share memory for multiprocessing in

2020-06-03 03:36发布

I have been tackling this problem for a week now and it's been getting pretty frustrating because every time I implement a simpler but similar scale example of what I need to do, it turns out multiprocessing will fudge it up. The way it handles shared memory baffles me because it is so limited, it can become useless quite rapidly.

So the basic description of my problem is that I need to create a process that gets passed in some parameters to open an image and create about 20K patches of size 60x40. These patches are saved into a list 2 at a time and need to be returned to the main thread to then be processed again by 2 other concurrent processes that run on the GPU.

The process and the workflow and all that are mostly taken care of, what I need now is the part that was supposed to be the easiest is turning out to be the most difficult. I have not been able to save and get the list with 20K patches back to the main thread.

First problem was because I was saving these patches as PIL images. I then found out all data added to a Queue object has to be pickled. Second problem was I then converted the patches to an array of 60x40 each and saved them to a list. And now that still doesn't work? Apparently Queues have a limited amount of data they can save otherwise when you call queue_obj.get() the program hangs.

I have tried many other things, and every new thing I try does not work, so I would like to know if anyone has other recommendations of a library I can use to share objects without all the fuzz?

Here is a sample implementation of kind of what I'm looking at. Keep in mind this works perfectly fine, but the full implementation doesn't. And I do have the code print informational messages to see that the data being saved has the exact same shape and everything, but for some reason it doesn't work. In the full implementation the independent process completes successfully but freezes at q.get().

from PIL import Image
from multiprocessing import Queue, Process
import StringIO
import numpy

img = Image.open("/path/to/image.jpg")
q = Queue()
q2 = Queue()
#
#
# MAX Individual Queue limit for 60x40 images in BW is 31,466.
# Multiple individual Queues can be filled to the max limit of 31,466.
# A single Queue can only take up to 31,466, even if split up in different puts.
def rz(patch, qn1, qn2):
    totalPatchCount = 20000
    channels = 1
    patch = patch.resize((60,40), Image.ANTIALIAS)
    patch = patch.convert('L')
    # ImgArray = numpy.asarray(im, dtype=numpy.float32)
    list_im_arr = []
    # ----Create a 4D Array
    # returnImageArray = numpy.zeros(shape=(totalPatchCount, channels, 40, 60))
    imgArray = numpy.asarray(patch, dtype=numpy.float32)
    imgArray = imgArray[numpy.newaxis, ...]
    # ----End 4D array
    # list_im_arr2 = []
    for i in xrange(totalPatchCount):
        # returnImageArray[i] = imgArray
        list_im_arr.append(imgArray)
    qn1.put(list_im_arr)
    qn1.cancel_join_thread()
    # qn2.cancel_join_thread()
    print "PROGRAM Done"

# rz(img,q,q2)
# l = q.get()

#
p = Process(target=rz,args=(img, q, q2,))
p.start()
p.join()
#
# # l = []
# # for i in xrange(1000): l.append(q.get())
#
imdata = q.get()

2条回答
Lonely孤独者°
2楼-- · 2020-06-03 03:47

You say "Apparently Queues have a limited amount of data they can save otherwise when you call queue_obj.get() the program hangs."

You're right and wrong there. There is a limited amount of information the Queue will hold without being drained. The problem is that when you do:

qn1.put(list_im_arr)
qn1.cancel_join_thread()

it schedules the communication to the underlying pipe (handled by a thread). The qn1.cancel_join_thread() then says "but it's cool if we exit without the scheduled put completing", and of course, a few microseconds later, the worker function exits and the Process exits (without waiting for the thread that is populating the pipe to actually do so; at best it might have sent the initial bytes of the object, but anything that doesn't fit in PIPE_BUF almost certainly gets dropped; you'd need some amazing race conditions to occur to get anything at all, let alone the whole of a large object). So later, when you do:

imdata = q.get()

nothing has actually been sent by the (now exited) Process. When you call q.get() it's waiting for data that never actually got transmitted.

The other answer is correct that in the case of computing and conveying a single value, Queues are overkill. But if you're going to use them, you need to use them properly. The fix would be to:

  1. Remove the call to qn1.cancel_join_thread() so the Process doesn't exit until the data has been transmitted across the pipe.
  2. Rearrange your calls to avoid deadlock

Rearranging is just this:

p = Process(target=rz,args=(img, q, q2,))
p.start()

imdata = q.get()
p.join()

moving p.join() after q.get(); if you try to join first, your main process will be waiting for the child to exit, and the child will be waiting for the queue to be consumed before it will exit (this might actually work if the Queue's pipe is drained by a thread in the main process, but it's best not to count on implementation details like that; this form is correct regardless of implementation details, as long as puts and gets are matched).

查看更多
Luminary・发光体
3楼-- · 2020-06-03 03:53

Queue is for communication between processes. In your case, you don't really have this kind of communication. You can simply let the process return result, and use the .get() method to collect them. (Remember to add if __name__ == "main":, see programming guideline)

from PIL import Image
from multiprocessing import Pool, Lock
import numpy

img = Image.open("/path/to/image.jpg")

def rz():
    totalPatchCount = 20000
    imgArray = numpy.asarray(patch, dtype=numpy.float32)
    list_im_arr = [imgArray] * totalPatchCount  # A more elegant way than a for loop
    return list_im_arr

if __name__ == '__main__':  
    # patch = img....  Your code to get generate patch here
    patch = patch.resize((60,40), Image.ANTIALIAS)
    patch = patch.convert('L')

    pool = Pool(2)
    imdata = [pool.apply_async(rz).get() for x in range(2)]
    pool.close()
    pool.join()

Now, according to first answer of this post, multiprocessing only pass objects that's picklable. Pickling is probably unavoidable in multiprocessing because processes don't share memory. They simply don't live in the same universe. (They do inherit memory when they're first spawned, but they can not reach out of their own universe). PIL image object itself is not picklable. You can make it picklable by extracting only the image data stored in it, like this post suggested.

Since your problem is mostly I/O bound, you can also try multi-threading. It might be even faster for your purpose. Threads share everything so no pickling is required. If you're using python 3, ThreadPoolExecutor is a wonderful tool. For Python 2, you can use ThreadPool. To achieve higher efficiency, you'll have to rearrange how you do things, you want to break-up the process and let different threads do the job.

from PIL import Image
from multiprocessing.pool import ThreadPool
from multiprocessing import Lock
import numpy

img = Image.open("/path/to/image.jpg")
lock = Lock():
totalPatchCount = 20000

def rz(x):
    patch = ...
    return patch

pool = ThreadPool(8)
imdata = [pool.map(rz, range(totalPatchCount)) for i in range(2)]
pool.close()
pool.join()
查看更多
登录 后发表回答