simple example of mxnet model parallelism

2019-05-31 00:50发布

The simple examples in the Guon tutorial for mxnet are very helpful to those of us who are just getting started with mxnet. As yet, there is not a simple example for model parallelism. I see the model parallelism example code for LSTM, but I am new to mxnet and it would help me (and perhaps others) to have a more streamlined example. So, I have created a model parallelism example by working off the regression example in the gluon tutorial, and by mixing in some code from mxnet.gluon.Trainer.

However, I am clearly getting something wrong. The gradients do not seem to be updated. Can anyone assist by identifying the problem(s)? The goal here is to create a linear regression model that has three layers, each held on a different gpu. The model itself is not useful, except as an example to show how initialization and training can occur for model parallelism, when using a custom block and imperative programming.

As I understand it, Trainer() is written for data parallelism. It will not work for model parallelism in that it requires all parameters to be initialized on all GPUs.

import os
import numpy as np
import mxnet as mx
from mxnet import nd, autograd, gluon
from mxnet.gluon import Block

# make some data
num_inputs = 2
num_outputs = 1
num_examples = 10000

def real_fn(X):
    return 2 * X[:, 0] - 3.4 * X[:, 1] + 4.2

X = np.random.normal(0,1, (num_examples, num_inputs))
noise = 0.001 * np.random.normal(0,1, (num_examples))
y = real_fn(X) + noise
y = y.reshape(-1,1)

# configuration
hidden_layers = 2
num_gpus = hidden_layers + 1
ctxList = [mx.gpu(i) for i in range(num_gpus)]
#ctxList = [mx.gpu() for i in range(num_gpus)]

#os.environ["MXNET_ENGINE_TYPE"] = "NaiveEngine"
print("\n")

# ======================================================================
class myDenseBlock(Block):
    """
    A custom layer
    """
    def __init__(self, layer_number, size_input, size_output, **kwargs):
        super(myDenseBlock, self).__init__(**kwargs)

        self.layer_number = layer_number
        self.size_input = size_input
        self.size_output = size_output

        with self.name_scope():
            # add parameters to the Block's ParameterDict.
            self.w = self.params.get(
                'weight',
                init= mx.init.Xavier(magnitude=2.24),
                shape=(size_input, size_output),
                grad_req = 'write')

            self.b = self.params.get(
                'bias',
                init= mx.init.Constant(0.5),
                shape=(size_output,),
                grad_req = 'write')

    def forward(self, x):
        x = x.as_in_context(ctxList[self.layer_number])
        with x.context:
            linear = nd.dot(x, self.w.data()) + self.b.data()
            return linear

# ======================================================================

# create net
net = gluon.nn.Sequential()
with net.name_scope():
    # initial layer, with X as input
    net.add(myDenseBlock(0,
        size_input = 2,
        size_output = 2))

    for ii in range(hidden_layers-1):
        net.add(myDenseBlock(ii+1,
            size_input = 2,
            size_output = 2))

    # final block, Y is nx1
    net.add(myDenseBlock(ii+2,
        size_input = 2,
        size_output = 1))


# ititialize paramerters for different blocks (layers) on different gpus.
params = net.collect_params()

"""
The parameters are:
sequential0_mydenseblock0_weight
sequential0_mydenseblock0_bias
sequential0_mydenseblock1_weight
sequential0_mydenseblock1_bias
sequential0_mydenseblock2_weight
sequential0_mydenseblock2_bias
"""

print("\ninitializing:")
for i, param in enumerate(params):
    if 'mydenseblock0' in param:
        params[param].initialize(ctx=ctxList[0])
    elif 'mydenseblock1' in param:
        params[param].initialize(ctx=ctxList[1])
    elif 'mydenseblock2' in param:
        params[param].initialize(ctx=ctxList[2])
    print("  ", i, param, "  ", params[param].list_data()[0].context)
print("\n")

def square_loss(yhat, y):
    return nd.mean((yhat - y) ** 2)

def mytrainer(updaters, params, ignore_stale_grad=False):
    #print("\n")
    for i, param in enumerate(params):
        #print(i, param, "  ", len(params[param].list_data()), params[param].list_data()[0].context)
        if params[param].grad_req == 'null':
            continue
        if not ignore_stale_grad:
            for data in params[param].list_data():
                if not data._fresh_grad:
                    print(
                        "`%s` on context %s has not been updated"%(params[param].name, str(data.context)))
                    assert False

        for upd, arr, grad in zip(updaters, params[param].list_data(), params[param].list_grad()):

            if not ignore_stale_grad or arr._fresh_grad:
                upd(i, grad, arr)
                arr._fresh_grad = False
                #print ("grad= ", grad)


batch_size = 100
epochs = 100000
iteration = -1

opt = mx.optimizer.create('adam', learning_rate=0.001, rescale_grad = 1 / batch_size)
updaters = [mx.optimizer.get_updater(opt)]

# the following definition for updaters does not work either
#updaters = [mx.optimizer.get_updater(opt) for _ in ctxList]

results = []
for e in range(epochs):
    train_groups = np.array_split(np.arange(X.shape[0]), X.shape[0]/batch_size)
    for ii, idx in enumerate(train_groups):
        iteration += 1
        xtrain, ytrain = X[idx,:], y[idx]

        xtrain = nd.array(xtrain)
        xtrain = xtrain.as_in_context(ctxList[0])

        ytrain = nd.array(ytrain).reshape((-1, 1))
        ytrain = ytrain.as_in_context(ctxList[0])

        with autograd.record():
            yhat = net(xtrain)
            error = square_loss(yhat, ytrain.as_in_context(ctxList[-1]))


            # Question: does the call to error.backward() go under the indent 
            # for autograd.record() or outside the indent? The gluon examples have 
            # it both ways

        error.backward()

        mytrainer(updaters, net.collect_params())

        if iteration%10 == 0:

            results.append([iteration, error.asnumpy().item()])
            print(("epoch= {:5,d}, iter= {:6,d},  error= {:6.3E}").format(
                e, iteration, error.asnumpy().item()))

The code fails at the "if not data._fresh_grad" test in mytrainer(). The output is:

initializing:
   0 sequential0_mydenseblock0_weight    gpu(0)
   1 sequential0_mydenseblock0_bias    gpu(0)
   2 sequential0_mydenseblock1_weight    gpu(1)
   3 sequential0_mydenseblock1_bias    gpu(1)
   4 sequential0_mydenseblock2_weight    gpu(2)
   5 sequential0_mydenseblock2_bias    gpu(2)

`sequential0_mydenseblock0_weight` on context gpu(0) has not been updated

I can verify using mx.autograd.get_symbol(error).tojson() that the computational graph only extends to the parameters on gpu(2), and does not reach other gpus.

标签: python mxnet
1条回答
beautiful°
2楼-- · 2019-05-31 01:28

Yes, per @sergei's comment, moving to v1.0.0 solves this.

查看更多
登录 后发表回答