TF Map function supports parallel calls. I'm seeing no improvements passing num_parallel_calls
to map. With num_parallel_calls=1
and num_parallel_calls=10
, there is no improvement in performance run time. Here is a simple code
import time
def test_two_custom_function_parallelism(num_parallel_calls=1, batch=False,
batch_size=1, repeat=1, num_iterations=10):
tf.reset_default_graph()
start = time.time()
dataset_x = tf.data.Dataset.range(1000).map(lambda x: tf.py_func(
squarer, [x], [tf.int64]),
num_parallel_calls=num_parallel_calls).repeat(repeat)
if batch:
dataset_x = dataset_x.batch(batch_size)
dataset_y = tf.data.Dataset.range(1000).map(lambda x: tf.py_func(
squarer, [x], [tf.int64]), num_parallel_calls=num_parallel_calls).repeat(repeat)
if batch:
dataset_y = dataset_x.batch(batch_size)
X = dataset_x.make_one_shot_iterator().get_next()
Y = dataset_x.make_one_shot_iterator().get_next()
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
i = 0
while True:
try:
res = sess.run([X, Y])
i += 1
if i == num_iterations:
break
except tf.errors.OutOfRangeError as e:
pass
Here are the timings
%timeit test_two_custom_function_parallelism(num_iterations=1000,
num_parallel_calls=2, batch_size=2, batch=True)
370ms
%timeit test_two_custom_function_parallelism(num_iterations=1000,
num_parallel_calls=5, batch_size=2, batch=True)
372ms
%timeit test_two_custom_function_parallelism(num_iterations=1000,
num_parallel_calls=10, batch_size=2, batch=True)
384ms
I used %timeit
in Juypter notebook. What am I doing it wrong?
The reason maybe the squarer cost less time than overhead time. I modified the code with adding a quarter function which cost 2 seconds. Then the parameter num_parallel_calls works as expected. Here is the complete code:
the output is:
So I am confused with the effect of "Global Interpreter Lock" mentioned by @mrry.
I setup my own version of
map
to get something similar to the TensorFlow'sDataset.map
, but which will use multiple CPUs forpy_function
s.Usage
Instead of
with the below code, you can get a CPU parallel
py_function
version using(The output type(s) for the py_function can also be specified if it's not a single
tf.float32
)Internally, this creates a pool of
multiprocessing
workers. It still uses the single the regular GIL limited TensorFlowmap
, but only to pass the input to a worker and get the output back. The workers processing the data happen in parallel on the CPU.Caveats
The function passed needs to be picklable to work with the
multiprocessing
pool. This should work for most cases, but some closures or whatnot may fail. Packages likedill
might loosen this restriction, but I haven't looked into that.If you pass an object's method as the function, you also need to be careful about how the object is duplicated across processes (each process will have its own copy of the object, so you can't rely on the attributes being shared).
As long as these considerations are kept in mind, this code should work for many cases.
Code
The problem here is that the only operation in the
Dataset.map()
function is atf.py_func()
op. This op calls back into the local Python interpreter to run a function in the same process. Increasingnum_parallel_calls
will increase the number of TensorFlow threads that attempt to call back into Python concurrently. However, Python has something called the "Global Interpreter Lock" that prevents more than one thread from executing code at once. As a result, all but one of these multiple parallel calls will be blocked waiting to acquire the Global Interpreter Lock, and there will be almost no parallel speedup (and perhaps even a slight slowdown).Your code example didn't include the definition of the
squarer()
function, but it might be possible to replacetf.py_func()
with pure TensorFlow ops, which are implemented in C++, and can execute in parallel. For example—and just guessing by the name—you could replace it with an invocation oftf.square(x)
, and you might then enjoy some parallel speedup.Note however that if the amount of work in the function is small, like squaring a single integer, the speedup might not be very large. Parallel
Dataset.map()
is more useful for heavier operations, like parsing a TFRecord withtf.parse_single_example()
or performing some image distortions as part of a data augmentation pipeline.