I am experimenting with distributed training options on Cloud ML Engine and I observing some peculiar results. I have basically altered the census custom estimator example to contain a slightly different model and changed my loss function to AdamOptimizer as the only real changes. Based on this other thread, my understanding is that any distributed training should be data-parallel asynchronous training which would suggest "If you distribute 10,000 batches among 10 worker nodes, each node works on roughly 1,000 batches." In my experiment, I have ~650k training examples and I am running the following experiments for 1 epoch with a batch size of 128. Given 650k training examples and a batch size of 128, I would expect there to be ~5.1k steps in an epoch. Here is the performance that I am seeing for different --scale-tier
's
NOT DISTRIBUTED
- BASIC: 8 steps/sec, 5.1k steps, 11 minute wall time
- BASIC_GPU: 24 steps/sec, 5.1k steps, 3.5 minute wall time
DISTRIBUTED
STANDARD_1: 14.5 steps/sec -- 26k steps (26k*128 = ~3.3M which is way more than the training samples actually in the data), 29 min wall time
CUSTOM -- 5 complex_model_m workers, 2 large_model parameter servers: 27 steps/sec, 31k steps (128*31k = ~3.9M which is way more than the 650k training samples actually in the data), wall time 20 minutes
My expectation was that the data-parallel based on the article was that the distributed training would split up the batches amongst all of the workers so if I had 5 workers on ~5k batches, then each worker would perform ~1,000 batches. However, the actual behavior that I am observing is that it seems closer to EACH of the 5 workers performing 1 epoch themselves. When training in a distributed setting, there are 6x as many steps taken in an epoch as there are training examples -- I know that the true definition of a step is each time the gradients are updated, but my understanding of data parallel training is that this would just split up the batches so there should be the same number of gradient updates -- is there any reason why this behavior would be expected? Would it make sense for there to be more train steps needed in a data-parallel asynchronous training distributed environment? Can anybody explain the behavior that I am observing?
The previous answer did a good job at explaining the performance bottlenecks. Let me explain about "epochs" and how TensorFlow processes datasets.
The way that distributed training works in TensorFlow is that each worker independently iterates through the entire dataset. It is a common misconception that the training set is partitioned amongst the workers, but this is not the case.
In a typical setup with queues (see this tutorial), what happens is each worker creates it's own queue. That queue gets filled with a list of all the filenames of all the training files (typically the list is shuffled and everytime the queue is exhausted, it gets repopulated and reshuffled). Each file is read in instance-by-instance, and the data is parsed, preprocessed, and then fed into another queue where the instances are shuffled and batched. Once the last instance of any file is read, the next filename is popped off the filename queue. If there are no more files to pop, an "epoch" has completed.
The important point here is that all of these queues are by default local -- not shared. So every worker is independently repeating the same work -- creating queues with all the files and iterating through the entire dataset. A full epoch, then, is roughly equal to the number of instances in the full dataset * the number of workers. (I'm not sure about your standard_1 result, but the result on CUSTOM means you have your master + 5 workers = 6 workers * 650K examples * (1 batch / 128 examples) = 31K steps).
FYI the use of epochs are discouraged for parameterizing distributed training because it's too confusing and there may even be issues with it in general. Just stick with max_steps.
Note that, as a consequence of TensorFlow's design, "batch size" means the batch size of each worker. But each worker is going to be sending updates to the parameter servers at roughly the same rate, so over a time period roughly equivalent to the time need to process one "batch", the number of updates that happen to the parameters are roughly batch_size
* num_workers
. This is what we call the effective batch size. This in turn has a few practical consequences:
- You tend to use smaller batch_sizes, especially if you have a large number of workers, so that the effective batch size stays sane.
- As you increase the number of workers, your effective batch size increase and hence you need to decrease your learning rate, at least when using "vanilla" stochastic gradient descent. Optimizers with adaptive learning rates (such as Adagrad, Adam, etc.) tend to be robust to the initial learning rates, but if you add enough workers you still may need to adjust the learning rate.
You may wonder why TensorFlow handles training data in this fashion. It's because in distributed systems, you can't rely on machines being the same speeds, or even being reliable at all. If you partition the training data into disjoint sets that go to each worker and then one or more machines are slow relative to the other or the network goes down on one, etc., your training process will see the data from the "fast"/reliable workers more frequently than the "slow"/unreliable workers. That biases the results towards those instances (or in extreme cases, ignores it all together).
There are two types of severs:
- Workers: which perform the graph computation
- Parameters servers: which store the parameters, so all the workers can share and update the parameters
You can have a bottle neck in the parameters servers. If the model is very big and you have few parameters servers you need more communication between the workers and the parameters server. For example, if you have 2 parameters servers you will have half of the parameters of the model in one server and the other half in the other. If you have a lot of workers they will have to get and send a lot of parameters to different workers, and the workers will have a big lag. If you increase the number of parameters servers the lag will be reduced because there is less communication between each parameter server and the workers.
As your batch size is 128 (quite big) and you can perform 8 steps per second only in the CPU I think your model is so fast that it takes more time to share the parameters of the server than running one iteration. So you could also try to increase the batch size.