GRPC causes training to pause in individual worker

2019-04-17 08:47发布

I am trying to train model in synchronous distributed fashion for data parallelism. There are 4 gpus in my machine. Each gpu should should run a worker to train on separate non-overlapping subset of the data (between graph replication). The main data file is separated into 16 smaller TFRecord files. Each worker is supposed to process 4 different files. The problem is that training freezes independently and at different times in each worker process. They freeze at some point.

enter image description here

One of the 'ps' reports following error related to grpc:

2017-09-21 16:45:55.606842: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> localhost:2000, 1 -> localhost:2001, 2 -> localhost:2002}
2017-09-21 16:45:55.606877: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2003, 1 -> localhost:2004, 2 -> localhost:2005, 3 -> localhost:2006}
2017-09-21 16:45:55.608066: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316] Started server with target: grpc://localhost:2002
E0921 16:48:52.596846076    3037 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=12325, new grpc_chttp2_stream id=12317
2017-09-21 16:48:57.497244: W tensorflow/core/framework/op_kernel.cc:1158] Out of range: End of sequence
     [[Node: data_source_task_index_0/IteratorGetNext = IteratorGetNext[output_shapes=[[-1,-1], [-1,-1], [-1,-1], [-1,-1], [-1,-1]], output_types=[DT_INT64, DT_INT64, DT_INT64, DT_INT64, DT_INT64], _device="/job:ps/replica:0/task:0/cpu:0"](data_source_task_index_0/Iterator)]]
     [[Node: data_source_task_index_0/cond/Merge_2_S341 = _Recv[client_terminated=false, recv_device="/job:ps/replica:0/task:2/cpu:0", send_device="/job:ps/replica:0/task:0/cpu:0", send_device_incarnation=-6450759800525444137, tensor_name="edge_359_data_source_task_index_0/cond/Merge_2", tensor_type=DT_INT64, _device="/job:ps/replica:0/task:2/cpu:0"]()]]
E0921 16:49:58.462749643    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24775, new grpc_chttp2_stream id=24769
E0921 16:49:58.462780714    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24775, new grpc_chttp2_stream id=24773
E0921 16:49:58.463260203    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24777
E0921 16:49:58.463277333    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24779
E0921 16:49:58.463283953    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24781
E0921 16:49:58.463289625    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24783
E0921 16:49:58.463295275    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24785

INPUT PIPELINE

I am using tensorflow dataset API as input pipeline. The sketch of the dataset code looks as follows:

def _dataset(filenames):
    input_files = tf.constant(filenames, dtype=tf.string)
    dataset = tf.contrib.data.TFRecordDataset(filenames)
    dataset = dataset.map(_parse_single_example)
    dataset = dataset.padded_batch(batch_size, padded_shapes=([-1], [-1]))
    iterator = dataset.make_initializable_iterator()
    words, labels = iterator.get_next()
    init_op = iterator.initializer
    return init_op, words, labels

ATTEMPT FOR DATA SEPARATION

First, we get list of files for the current worker/task.

data_files = get_file_name_for_this_worker(task_index)

Then, the data_files are feed into the dataset. The effect we want have is that no two workers process the same dataset.

Scoping the data set for each worker

    with tf.device(
        tf.train.replica_device_setter(
            worker_device = worker_device,
            ps_device     = ps_device,
            cluster       = cluster,
            ps_strategy = load_balancer)):

            global DATA_SOURCES

            # Setup dataset for each worker (in each process)
            for worker_id in range(num_workers):
                with tf.variable_scope('data_source_task_index_%d' % worker_id):
                    DATA_SOURCES[worker_id] = _dataset(data_files)

            # Select the relevent data source for current task       
            init_op, words, labels = DATA_SOURCES[task_index]

            model = build_model(words, labels)
            ...
            sess, sv, train_op = synchronise(model, p_config, server)
            train(model, sess, train_op, init_op, sv)

Training loop

The training iteration code is such that the data source is initialized after completely passing through the local data (every local epoch). The OutOfRange Exception is an indication that the epoch is complete.

def train(model, sess, train_op, init_op, sv)   
    for epoch in range(FLAGS.num_epochs):
        print("Initialising the data source")
        sess.run(init_op)
        batch = 0
        while True:
            batch += 1
            try:
                if (batch % FLAGS.report_every_batch == 0):
                    on_report_batch(model, train_op, sess)
                else:
                    sess.run(train_op)
            except tf.errors.OutOfRangeError:
                on_epoch_complete(model, sess)
                break
    print("Out of epoch loop")
    if writer:
        writer.close()
        print('Done training, total elapsed time:  %f' % (time.time()-begin_time))

def on_report_batch(model, train_op, sess):
    ...
    _, batch_loss, step = sess.run([train_op, model.batch_loss, model.global_step])
    print("Step: %d," % step, 
          " Epoch: %2d," % (epoch+1), 
          " Batch: %3d," % batch, 
          " Batch Cost: %.4f," % batch_loss,
          " Elapsed Time: %f, " % (elapsed/60.0),
          " Time per batch: %f" % time_per_batch)

登录 后发表回答