I am trying to train model in synchronous distributed fashion for data parallelism. There are 4 gpus in my machine. Each gpu should should run a worker to train on separate non-overlapping subset of the data (between graph replication). The main data file is separated into 16 smaller TFRecord files. Each worker is supposed to process 4 different files. The problem is that training freezes independently and at different times in each worker process. They freeze at some point.
One of the 'ps' reports following error related to grpc:
2017-09-21 16:45:55.606842: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> localhost:2000, 1 -> localhost:2001, 2 -> localhost:2002}
2017-09-21 16:45:55.606877: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2003, 1 -> localhost:2004, 2 -> localhost:2005, 3 -> localhost:2006}
2017-09-21 16:45:55.608066: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316] Started server with target: grpc://localhost:2002
E0921 16:48:52.596846076 3037 parsing.c:801] ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=12325, new grpc_chttp2_stream id=12317
2017-09-21 16:48:57.497244: W tensorflow/core/framework/op_kernel.cc:1158] Out of range: End of sequence
[[Node: data_source_task_index_0/IteratorGetNext = IteratorGetNext[output_shapes=[[-1,-1], [-1,-1], [-1,-1], [-1,-1], [-1,-1]], output_types=[DT_INT64, DT_INT64, DT_INT64, DT_INT64, DT_INT64], _device="/job:ps/replica:0/task:0/cpu:0"](data_source_task_index_0/Iterator)]]
[[Node: data_source_task_index_0/cond/Merge_2_S341 = _Recv[client_terminated=false, recv_device="/job:ps/replica:0/task:2/cpu:0", send_device="/job:ps/replica:0/task:0/cpu:0", send_device_incarnation=-6450759800525444137, tensor_name="edge_359_data_source_task_index_0/cond/Merge_2", tensor_type=DT_INT64, _device="/job:ps/replica:0/task:2/cpu:0"]()]]
E0921 16:49:58.462749643 3036 parsing.c:801] ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24775, new grpc_chttp2_stream id=24769
E0921 16:49:58.462780714 3036 parsing.c:801] ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24775, new grpc_chttp2_stream id=24773
E0921 16:49:58.463260203 3036 parsing.c:801] ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24777
E0921 16:49:58.463277333 3036 parsing.c:801] ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24779
E0921 16:49:58.463283953 3036 parsing.c:801] ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24781
E0921 16:49:58.463289625 3036 parsing.c:801] ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24783
E0921 16:49:58.463295275 3036 parsing.c:801] ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24785
INPUT PIPELINE
I am using tensorflow dataset API as input pipeline. The sketch of the dataset code looks as follows:
def _dataset(filenames):
input_files = tf.constant(filenames, dtype=tf.string)
dataset = tf.contrib.data.TFRecordDataset(filenames)
dataset = dataset.map(_parse_single_example)
dataset = dataset.padded_batch(batch_size, padded_shapes=([-1], [-1]))
iterator = dataset.make_initializable_iterator()
words, labels = iterator.get_next()
init_op = iterator.initializer
return init_op, words, labels
ATTEMPT FOR DATA SEPARATION
First, we get list of files for the current worker/task.
data_files = get_file_name_for_this_worker(task_index)
Then, the data_files are feed into the dataset. The effect we want have is that no two workers process the same dataset.
Scoping the data set for each worker
with tf.device(
tf.train.replica_device_setter(
worker_device = worker_device,
ps_device = ps_device,
cluster = cluster,
ps_strategy = load_balancer)):
global DATA_SOURCES
# Setup dataset for each worker (in each process)
for worker_id in range(num_workers):
with tf.variable_scope('data_source_task_index_%d' % worker_id):
DATA_SOURCES[worker_id] = _dataset(data_files)
# Select the relevent data source for current task
init_op, words, labels = DATA_SOURCES[task_index]
model = build_model(words, labels)
...
sess, sv, train_op = synchronise(model, p_config, server)
train(model, sess, train_op, init_op, sv)
Training loop
The training iteration code is such that the data source is initialized after completely passing through the local data (every local epoch). The OutOfRange Exception is an indication that the epoch is complete.
def train(model, sess, train_op, init_op, sv)
for epoch in range(FLAGS.num_epochs):
print("Initialising the data source")
sess.run(init_op)
batch = 0
while True:
batch += 1
try:
if (batch % FLAGS.report_every_batch == 0):
on_report_batch(model, train_op, sess)
else:
sess.run(train_op)
except tf.errors.OutOfRangeError:
on_epoch_complete(model, sess)
break
print("Out of epoch loop")
if writer:
writer.close()
print('Done training, total elapsed time: %f' % (time.time()-begin_time))
def on_report_batch(model, train_op, sess):
...
_, batch_loss, step = sess.run([train_op, model.batch_loss, model.global_step])
print("Step: %d," % step,
" Epoch: %2d," % (epoch+1),
" Batch: %3d," % batch,
" Batch Cost: %.4f," % batch_loss,
" Elapsed Time: %f, " % (elapsed/60.0),
" Time per batch: %f" % time_per_batch)