如何运行TensorFlow分布式MNIST示例?

12

我是分布式TensorFlow的新手。我在这里找到了这个分布式MNIST测试: https://github.com/tensorflow/tensorflow/blob/master/tensorflow/tools/dist_test/python/mnist_replica.py

但我不知道如何运行它。我使用了以下脚本:

  python distributed_mnist.py  --num_workers=3 --num_parameter_servers=1 --worker_index=0 --worker_grpc_url="grpc://tf-worker0:2222"\
  & python distributed_mnist.py  --num_workers=3 --num_parameter_servers=1 --worker_index=1 --worker_grpc_url="grpc://tf-worker1:2222"\
  & python distributed_mnist.py  --num_workers=3 --num_parameter_servers=1 --worker_index=2 --worker_grpc_url="grpc://tf-worker2:2222"

我刚刚发现这些参数缺失了,所以我将它们传递给程序。这是发生的情况:

I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcublas.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcublas.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcudnn.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcudnn.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcufft.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcufft.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcuda.so.1 locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcuda.so.1 locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcurand.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcurand.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcublas.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcudnn.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcufft.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcuda.so.1 locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcurand.so locally
Extracting /tmp/mnist-data/train-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/train-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/train-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/train-labels-idx1-ubyte.gz
Extracting /tmp/mnist-data/t10k-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/train-labels-idx1-ubyte.gz
Extracting /tmp/mnist-data/train-labels-idx1-ubyte.gz
Extracting /tmp/mnist-data/t10k-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/t10k-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/t10k-labels-idx1-ubyte.gz
Extracting /tmp/mnist-data/t10k-labels-idx1-ubyte.gz
Extracting /tmp/mnist-data/t10k-labels-idx1-ubyte.gz
Worker GRPC URL: grpc://tf-worker0:2222
Worker index = 0
Number of workers = 3
Worker GRPC URL: grpc://tf-worker2:2222
Worker index = 2
Number of workers = 3
Worker GRPC URL: grpc://tf-worker1:2222
Worker index = 1
Number of workers = 3
Worker 0: Initializing session...
Worker 2: Waiting for session to be initialized...
Worker 1: Waiting for session to be initialized...
E0608 20:37:13.514249023    7501 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:13.514287961    7501 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds
E0608 20:37:13.548052986    7502 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:13.548091527    7502 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds
E0608 20:37:13.555449386    7503 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:13.555473898    7503 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds
^CE0608 20:37:28.517451603    7504 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:28.517491102    7504 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds
E0608 20:37:28.551002331    7505 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:28.551029795    7505 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds
E0608 20:37:28.556681378    7506 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:28.556709728    7506 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds

有人知道如何正确运行它吗?非常感谢!

1个回答

17

您的命令行中 --worker_grpc_url 标志的值指向不存在的地址。

脚本旨在在特定的 Kubernetes 环境中运行,而不是独立运行。特别地,tf-worker0:2222tf-worker1:2222tf-worker2:2222 是由此测试的自动化版本创建的 Kubernetes 容器的名称。要使其作为独立测试工作,需要进行相当大的更改。

分布式 TensorFlow 的文档包括一个示例训练程序的代码。尝试在分布式 TensorFlow 上运行 MNIST 最简单的方法是将模型粘贴到模板中。例如,类似以下内容的东西应该可以工作:

import math
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")

# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("hidden_units", 100,
                            "Number of units in the hidden layer of the NN")
tf.app.flags.DEFINE_string("data_dir", "/tmp/mnist-data",
                           "Directory for storing mnist data")
tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size")

FLAGS = tf.app.flags.FLAGS

IMAGE_PIXELS = 28

def main(_):
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts.split(",")

  # Create a cluster from the parameter server and worker hosts.
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

  # Create and start a server for the local task.
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)

  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":

    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):

      # Variables of the hidden layer
      hid_w = tf.Variable(
          tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
                              stddev=1.0 / IMAGE_PIXELS), name="hid_w")
      hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")

      # Variables of the softmax layer
      sm_w = tf.Variable(
          tf.truncated_normal([FLAGS.hidden_units, 10],
                              stddev=1.0 / math.sqrt(FLAGS.hidden_units)),
          name="sm_w")
      sm_b = tf.Variable(tf.zeros([10]), name="sm_b")

      x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
      y_ = tf.placeholder(tf.float32, [None, 10])

      hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
      hid = tf.nn.relu(hid_lin)

      y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
      loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))

      global_step = tf.Variable(0)

      train_op = tf.train.AdagradOptimizer(0.01).minimize(
          loss, global_step=global_step)

      saver = tf.train.Saver()
      summary_op = tf.summary.merge_all()
      init_op = tf.initialize_all_variables()

    # Create a "supervisor", which oversees the training process.
    sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                             logdir="/tmp/train_logs",
                             init_op=init_op,
                             summary_op=summary_op,
                             saver=saver,
                             global_step=global_step,
                             save_model_secs=600)

    mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)

    # The supervisor takes care of session initialization, restoring from
    # a checkpoint, and closing when done or an error occurs.
    with sv.managed_session(server.target) as sess:
      # Loop until the supervisor shuts down or 1000000 steps have completed.
      step = 0
      while not sv.should_stop() and step < 1000000:
        # Run a training step asynchronously.
        # See `tf.train.SyncReplicasOptimizer` for additional details on how to
        # perform *synchronous* training.

        batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
        train_feed = {x: batch_xs, y_: batch_ys}

        _, step = sess.run([train_op, global_step], feed_dict=train_feed)
        if step % 100 == 0: 
            print "Done step %d" % step

    # Ask for all the services to stop.
    sv.stop()

if __name__ == "__main__":
  tf.app.run()

非常感谢您的解释!我会尝试一下。 - xyd
非常感谢!我只是为你的精彩帖子贡献了我的意见。tf.merge_all_summaries() 似乎已经被弃用,在最新版本中会出现错误,可以使用 tf.merge_all 或 tf.contrib.deprecated.merge_all_summaries。 - sunil manikani
@sunilmanikani 感谢您指出这个问题... 我已经更新了代码,现在使用 tf.summary.merge_all() - mrry
FYI,tf.train.Supervisor现已被弃用,应改用tf.train.MonitoredTrainingSession。请参见https://www.tensorflow.org/api_docs/python/tf/train/Supervisor。 - erwaman
我的理解是每个工作进程都在读取完整的输入数据;没有对输入数据进行分区。我的理解正确吗? - erwaman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接