Updating batch image array in-place when using job

2019-08-05 20:27发布

问题:

This is a follow-up question for my solution to the question below:

How to apply a function in parallel to multiple images in a numpy array?

My suggested solution works fine if the function process_image() has to return the result and then we can cache that to some list for later processing.

Since I want to do this type of preprocessing for more than 100K images (with array shape (100000, 32, 32, 3)), I want my solution to be very efficient. But, my list based approach will hog up lot of memory and thus it will also be inefficient (for further processing). So, I want the array to be updated in-place inside the process_image() function, when this function is called several times using joblib.

But, I'm having issues with updating the original batched image array in-place. I tried the suggestion by Eric but it fails to update the original array in-place. I verified whether the array memory is indeed shared among the worker processes by printing the flags of the array inside the process_image function. Here is my code for doing so:

import numpy as np
from skimage import exposure
from joblib import Parallel, delayed

# number of processes
nprocs = 10

# batched image array
img_arr = np.random.randint(0, 255, (1000, 32, 32, 3)).astype(np.float32)

# for verification
img_arr_copy = img_arr.copy()

# function to be applied on all images (in parallel)
# note: this function fails to update the original array in-place
# but, I want in-place updation of original array with the result of `equalize_hist`
def process_image(img, idx):
     """
     update original array in-place since all worker processes share
     original memory! i.e. they don't make copy while processing it.
     """
     print("\n processing image: ", idx)
     img[...] = exposure.equalize_hist(img)
     print("array metadata: \n", img.flags)
     print("======================= \n")

# run `process_image()` in parallel
Parallel(n_jobs=nprocs)(delayed(process_image)(img_arr[idx], idx) for idx in range(img_arr.shape[0]))

I even tried initializing an empty array using np.empty() of same shape as original batched image array and tried updating it but that also failed. I don't know where is it going wrong.

For checking whether the updation happened to the array or not, I used:

np.all(result_arr == img_arr)

where result_arr was initialized as:

result_arr = np.empty(img_arr.shape, dtype=np.float32)

Where am I going wrong and what's the bug in my code? All suggestions are highly appreciated!!


Print stats from above code to check whether memory is shared or not:

processing image:  914 
 array metadata:  
 C_CONTIGUOUS : True 
 F_CONTIGUOUS : False 
 OWNDATA : False     #<=========== so memory is shared
 WRITEABLE : True 
 ALIGNED : True 
 UPDATEIFCOPY : False 
======================= 

 processing image:  614
 array metadata: 
 C_CONTIGUOUS : True
 F_CONTIGUOUS : False
 OWNDATA : False     #<=========== so memory is shared
 WRITEABLE : True
 ALIGNED : True
 UPDATEIFCOPY : False
=======================