Running multiple tensorflow sessions concurrently

2019-03-24 15:33发布

问题:

I am trying to run several sessions of TensorFlow concurrently on a CentOS 7 machine with 64 CPUs. My colleague reports that he can use the following two blocks of code to produce a parallel speedup on his machine using 4 cores:

mnist.py

import numpy as np
import input_data
from PIL import Image
import tensorflow as tf
import time


def main(randint):
    print 'Set new seed:', randint
    np.random.seed(randint)
    tf.set_random_seed(randint)
    mnist = input_data.read_data_sets("MNIST_data/", one_hot=True)

    # Setting up the softmax architecture
    x = tf.placeholder("float", [None, 784])
    W = tf.Variable(tf.zeros([784, 10]))
    b = tf.Variable(tf.zeros([10]))
    y = tf.nn.softmax(tf.matmul(x, W) + b)

    # Setting up the cost function
    y_ = tf.placeholder("float", [None, 10])
    cross_entropy = -tf.reduce_sum(y_*tf.log(y))
    train_step = tf.train.GradientDescentOptimizer(0.01).minimize(cross_entropy)

    # Initialization 
    init = tf.initialize_all_variables()
    sess = tf.Session(
        config=tf.ConfigProto(
            inter_op_parallelism_threads=1,
            intra_op_parallelism_threads=1
        )
    )
    sess.run(init)

    for i in range(1000):
        batch_xs, batch_ys = mnist.train.next_batch(100)
        sess.run(train_step, feed_dict={x: batch_xs, y_: batch_ys})

    correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))

    print sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels})

if __name__ == "__main__":
    t1 = time.time()
    main(0)
    t2 = time.time()
    print "time spent: {0:.2f}".format(t2 - t1)

parallel.py

import multiprocessing
import numpy as np

import mnist
import time

t1 = time.time()
p1 = multiprocessing.Process(target=mnist.main,args=(np.random.randint(10000000),))
p2 = multiprocessing.Process(target=mnist.main,args=(np.random.randint(10000000),))
p3 = multiprocessing.Process(target=mnist.main,args=(np.random.randint(10000000),))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
t2 = time.time()
print "time spent: {0:.2f}".format(t2 - t1)

In particular, he says that he observes

Running a single process took: 39.54 seconds
Running three processes took: 54.16 seconds

However, when I run the code:

python mnist.py
==> Time spent: 5.14

python parallel.py 
==> Time spent: 37.65

As you can see, I get a significant slowdown by using multiprocessing whereas my colleague does not. Does anyone have any insight as to why this could be occurring and what can be done to fix it?

EDIT

Here is some example output. Notice that loading the data seems to occur in parallel, but training the individual models has a very sequential look in the output (and which can be verified by looking at CPU usage in top as the program executes)

#$ python parallel.py 
Set new seed: 9672406
Extracting MNIST_data/train-images-idx3-ubyte.gz
Set new seed: 4790824
Extracting MNIST_data/train-images-idx3-ubyte.gz
Set new seed: 8011659
Extracting MNIST_data/train-images-idx3-ubyte.gz
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz
I tensorflow/core/common_runtime/local_device.cc:25] Local device intra op parallelism threads: 1
I tensorflow/core/common_runtime/local_session.cc:45] Local session inter op parallelism threads: 1
0.9136
I tensorflow/core/common_runtime/local_device.cc:25] Local device intra op parallelism threads: 1
I tensorflow/core/common_runtime/local_session.cc:45] Local session inter op parallelism threads: 1
0.9149
I tensorflow/core/common_runtime/local_device.cc:25] Local device intra op parallelism threads: 1
I tensorflow/core/common_runtime/local_session.cc:45] Local session inter op parallelism threads: 1
0.8931
time spent: 41.36

Another EDIT

Suppose we wish to confirm that the issue is seemingly with TensorFlow and not with multiprocessing. I replaced the contents of mnist.py with a big loop as follows:

def main(randint):
    c = 0
    for i in xrange(100000000):
        c += i

For output:

#$ python mnist.py
==> time spent: 5.16
#$ python parallel.py 
==> time spent: 4.86

Hence I think the problem here is not with multiprocessing itself.

回答1:

From comment by OP (user1936768):

I have good news: It turns out, on my system at least, my trial programs didn't execute long enough for the other instances of TF to start up. When I put a longer running example program in main, I do indeed see concurrent computations



回答2:

One possibility is that your sessions are trying to use 64 cores each and stomping on each other Perhaps try setting NUM_CORES to a lower value for each session

sess = tf.Session(
    tf.ConfigProto(inter_op_parallelism_threads=NUM_CORES,
                   intra_op_parallelism_threads=NUM_CORES))


回答3:

This can be done elegantly with Ray, which is a library for parallel and distributed Python, which lets you train your models in parallel from a single Python script.

This has the advantage of letting you parallelize "classes" by turning them in to "actors", which can be hard to do with regular Python multiprocessing. This is important because the expensive part of often initializing the TensorFlow graph. If you create an actor and then call the train method multiple times, the cost of initializing the graph is amortized.

import numpy as np
from tensorflow.examples.tutorials.mnist import input_data
from PIL import Image
import ray
import tensorflow as tf
import time


@ray.remote
class TrainingActor(object):
    def __init__(self, seed):
        print('Set new seed:', seed)
        np.random.seed(seed)
        tf.set_random_seed(seed)
        self.mnist = input_data.read_data_sets('MNIST_data/', one_hot=True)

        # Setting up the softmax architecture.
        self.x = tf.placeholder('float', [None, 784])
        W = tf.Variable(tf.zeros([784, 10]))
        b = tf.Variable(tf.zeros([10]))
        self.y = tf.nn.softmax(tf.matmul(self.x, W) + b)

        # Setting up the cost function.
        self.y_ = tf.placeholder('float', [None, 10])
        cross_entropy = -tf.reduce_sum(self.y_*tf.log(self.y))
        self.train_step = tf.train.GradientDescentOptimizer(0.01).minimize(cross_entropy)

        # Initialization
        self.init = tf.initialize_all_variables()
        self.sess = tf.Session(
            config=tf.ConfigProto(
                inter_op_parallelism_threads=1,
                intra_op_parallelism_threads=1
            )
        )

    def train(self):
        self.sess.run(self.init)

        for i in range(1000):
            batch_xs, batch_ys = self.mnist.train.next_batch(100)
            self.sess.run(self.train_step, feed_dict={self.x: batch_xs, self.y_: batch_ys})

        correct_prediction = tf.equal(tf.argmax(self.y, 1), tf.argmax(self.y_, 1))
        accuracy = tf.reduce_mean(tf.cast(correct_prediction, 'float'))

        return self.sess.run(accuracy, feed_dict={self.x: self.mnist.test.images,
                                                  self.y_: self.mnist.test.labels})


if __name__ == '__main__':
    # Start Ray.
    ray.init()

    # Create 3 actors.
    training_actors = [TrainingActor.remote(seed) for seed in range(3)]

    # Make them all train in parallel.
    accuracy_ids = [actor.train.remote() for actor in training_actors]
    print(ray.get(accuracy_ids))

    # Start new training runs in parallel.
    accuracy_ids = [actor.train.remote() for actor in training_actors]
    print(ray.get(accuracy_ids))

If you only want to create one copy of the dataset instead of having each actor read the dataset, you can rewrite things as follows. Under the hood, this uses the Plasma shared memory object store and the Apache Arrow data format.

@ray.remote
class TrainingActor(object):
    def __init__(self, mnist, seed):
        self.mnist = mnist
        ...

    ...

if __name__ == "__main__":
    ray.init()

    # Read the mnist dataset and put it into shared memory once
    # so that workers don't create their own copies.
    mnist = input_data.read_data_sets('MNIST_data/', one_hot=True)
    mnist_id = ray.put(mnist)

    training_actors = [TrainingActor.remote(mnist_id, seed) for seed in range(3)]

You can see more in the Ray documentation. Note I'm one of the Ray developers.