how to run tensorflow distributed mnist example

2020-02-04 20:25发布

问题:

I am new to distributed tensorflow. I found this distributed mnist test in here: https://github.com/tensorflow/tensorflow/blob/master/tensorflow/tools/dist_test/python/mnist_replica.py

But I don't know how to make it run. I used the following script:

  python distributed_mnist.py  --num_workers=3 --num_parameter_servers=1 --worker_index=0 --worker_grpc_url="grpc://tf-worker0:2222"\
  & python distributed_mnist.py  --num_workers=3 --num_parameter_servers=1 --worker_index=1 --worker_grpc_url="grpc://tf-worker1:2222"\
  & python distributed_mnist.py  --num_workers=3 --num_parameter_servers=1 --worker_index=2 --worker_grpc_url="grpc://tf-worker2:2222"

I just found these parameters are missing, so I pass them to the program. Here is what happened:

I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcublas.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcublas.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcudnn.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcudnn.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcufft.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcufft.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcuda.so.1 locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcuda.so.1 locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcurand.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcurand.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcublas.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcudnn.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcufft.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcuda.so.1 locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcurand.so locally
Extracting /tmp/mnist-data/train-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/train-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/train-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/train-labels-idx1-ubyte.gz
Extracting /tmp/mnist-data/t10k-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/train-labels-idx1-ubyte.gz
Extracting /tmp/mnist-data/train-labels-idx1-ubyte.gz
Extracting /tmp/mnist-data/t10k-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/t10k-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/t10k-labels-idx1-ubyte.gz
Extracting /tmp/mnist-data/t10k-labels-idx1-ubyte.gz
Extracting /tmp/mnist-data/t10k-labels-idx1-ubyte.gz
Worker GRPC URL: grpc://tf-worker0:2222
Worker index = 0
Number of workers = 3
Worker GRPC URL: grpc://tf-worker2:2222
Worker index = 2
Number of workers = 3
Worker GRPC URL: grpc://tf-worker1:2222
Worker index = 1
Number of workers = 3
Worker 0: Initializing session...
Worker 2: Waiting for session to be initialized...
Worker 1: Waiting for session to be initialized...
E0608 20:37:13.514249023    7501 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:13.514287961    7501 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds
E0608 20:37:13.548052986    7502 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:13.548091527    7502 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds
E0608 20:37:13.555449386    7503 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:13.555473898    7503 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds
^CE0608 20:37:28.517451603    7504 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:28.517491102    7504 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds
E0608 20:37:28.551002331    7505 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:28.551029795    7505 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds
E0608 20:37:28.556681378    7506 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:28.556709728    7506 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds

Anyone know how to run it correctly? Thanks a lot!

回答1:

The values of the --worker_grpc_url flag in your command-line refer to addresses that don't exist.

This script is designed to run in a particular Kubernetes environment, and not standalone. In particular tf-worker0:2222, tf-worker1:2222, and tf-worker2:2222 refer to the names of Kubernetes containers that are created by an automated version of this test. It would require considerable changes to work as a standalone test.

The documentation for distributed TensorFlow includes code for an example trainer program. The easiest way to try out MNIST on distributed TensorFlow would be to paste the model into the template. For example, something like the following should work:

import math
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")

# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("hidden_units", 100,
                            "Number of units in the hidden layer of the NN")
tf.app.flags.DEFINE_string("data_dir", "/tmp/mnist-data",
                           "Directory for storing mnist data")
tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size")

FLAGS = tf.app.flags.FLAGS

IMAGE_PIXELS = 28

def main(_):
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts.split(",")

  # Create a cluster from the parameter server and worker hosts.
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

  # Create and start a server for the local task.
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)

  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":

    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):

      # Variables of the hidden layer
      hid_w = tf.Variable(
          tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
                              stddev=1.0 / IMAGE_PIXELS), name="hid_w")
      hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")

      # Variables of the softmax layer
      sm_w = tf.Variable(
          tf.truncated_normal([FLAGS.hidden_units, 10],
                              stddev=1.0 / math.sqrt(FLAGS.hidden_units)),
          name="sm_w")
      sm_b = tf.Variable(tf.zeros([10]), name="sm_b")

      x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
      y_ = tf.placeholder(tf.float32, [None, 10])

      hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
      hid = tf.nn.relu(hid_lin)

      y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
      loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))

      global_step = tf.Variable(0)

      train_op = tf.train.AdagradOptimizer(0.01).minimize(
          loss, global_step=global_step)

      saver = tf.train.Saver()
      summary_op = tf.summary.merge_all()
      init_op = tf.initialize_all_variables()

    # Create a "supervisor", which oversees the training process.
    sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                             logdir="/tmp/train_logs",
                             init_op=init_op,
                             summary_op=summary_op,
                             saver=saver,
                             global_step=global_step,
                             save_model_secs=600)

    mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)

    # The supervisor takes care of session initialization, restoring from
    # a checkpoint, and closing when done or an error occurs.
    with sv.managed_session(server.target) as sess:
      # Loop until the supervisor shuts down or 1000000 steps have completed.
      step = 0
      while not sv.should_stop() and step < 1000000:
        # Run a training step asynchronously.
        # See `tf.train.SyncReplicasOptimizer` for additional details on how to
        # perform *synchronous* training.

        batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
        train_feed = {x: batch_xs, y_: batch_ys}

        _, step = sess.run([train_op, global_step], feed_dict=train_feed)
        if step % 100 == 0: 
            print "Done step %d" % step

    # Ask for all the services to stop.
    sv.stop()

if __name__ == "__main__":
  tf.app.run()