I am implementing Neural Network whose input and output matrices are very large, so I am using dask arrays for storing them.
X
is input matrix of 32000 x 7500 and y
is output matrix of same dimension.
Below is neural network code having 1 hidden layer:
class Neural_Network(object):
def __init__(self,i,j,k):
#define hyperparameters
self.inputLayerSize = i
self.outputLayerSize = j
self.hiddenLayerSize = k
#weights
self.W1 = da.random.normal(0.5,0.5,size =(self.inputLayerSize,self.hiddenLayerSize),chunks=(1000,1000))
self.W2 = da.random.normal(0.5,0.5,size =(self.hiddenLayerSize,self.outputLayerSize),chunks=(1000,1000))
self.W1 = self.W1.astype('float96')
self.W2 = self.W2.astype('float96')
def forward(self,X):
self.z2 = X.dot(self.W1)
self.a2 = self.z2.map_blocks(self.sigmoid)
self.z3 = self.a2.dot(self.W2)
yhat = self.z3.map_blocks(self.sigmoid)
return yhat
def exp(z):
return np.exp(z)
def sigmoid(self,z):
#sigmoid function
## return 1/(1+np.exp(-z))
return 1/(1+(-z).map_blocks(self.exp))
def sigmoidprime(self,z):
ez = (-z).map_blocks(self.exp)
return ez/(1+ez**2)
def costFunction (self,X,y):
self.yHat = self.forward(X)
return 1/2*sum((y-self.yHat)**2)
def costFunctionPrime (self,X,y):
self.yHat = self.forward(X)
self.error = -(y - self.yHat)
self.delta3 = self.error*self.z3.map_blocks(self.sigmoidprime)
dJdW2 = self.a2.transpose().dot(self.delta3)
self.delta2 = self.delta3.dot(self.W2.transpose())*self.z2.map_blocks(self.sigmoidprime)
dJdW1 = X.transpose().dot(self.delta2)
return dJdW1 , dJdW2
Now I try to reduce cost of function as below:
>>> n = Neural_Network(7420,7420,5000)
>>> for i in range(0,500):
cost1,cost2 = n.costFunctionPrime(X,y)
n.W1 = n.W1 -3*cost1
n.W2 = n.W2 -3*cost2
if i%5==0:
print (i*100/500,'%')
But when i
reaches around 120 it gives me error:
File "<pyshell#127>", line 3, in <module>
n.W1 = n.W1 -3*cost1
File "c:\python34\lib\site-packages\dask\array\core.py", line 1109, in __sub__
return elemwise(operator.sub, self, other)
File "c:\python34\lib\site-packages\dask\array\core.py", line 2132, in elemwise
dtype=dt, name=name)
File "c:\python34\lib\site-packages\dask\array\core.py", line 1659, in atop
return Array(merge(dsk, *dsks), out, chunks, dtype=dtype)
File "c:\python34\lib\site-packages\toolz\functoolz.py", line 219, in __call__
return self._partial(*args, **kwargs)
File "c:\python34\lib\site-packages\toolz\curried\exceptions.py", line 20, in merge
return toolz.merge(*dicts, **kwargs)
File "c:\python34\lib\site-packages\toolz\dicttoolz.py", line 39, in merge
rv.update(d)
MemoryError
It also gives MemoryError
when I do nn.W1.compute()
This looks like its failing while building the graph, not during computation. Two things come to mind:
Avoid excessive looping
Each iteration of your for loop may be dumping millions of tasks into the task graph. Each task probably takes up something like 100B to 1kB. When these add up they can easily overwhelm your machine.
In a typical deep learning library, like Theano, you would use a
scan
operation for something like this. Dask.array has no such operation.Avoid inserting graphs into graphs
You call map_blocks on a function that itself calls map_blocks.
Instead you might just make a sigmoid prime function
And then map that function
Deep learning is tricky
Generally speaking, doing deep learning well often requires specialization. The libraries designed to do this well generally aren't general purpose for a reason. A general purpose library, like dask.array might be useful but will probably never reach the smooth operation of a library like Theano.
A possible approach
You might try building a function that takes just one step. It would read from disk, do all of your dot products, transposes, and normal computations, and would then store explicitly into an on-disk dataset. You would then call this function many times. Even then I'm not convinced that the scheduling policies behind dask.array could do this well.