I work on hierarchical agglomerative clustering on large amounts of multidimensional vectors, and I noticed that the biggest bottleneck is the construction of the distance matrix. A naive implementation for this task is the following (here in Python):
''' v = an array (N,d), where rows are the observations
and columns the dimensions'''
def create_dist_matrix(v):
N = v.shape[0]
D = np.zeros((N,N))
for i in range(N):
for j in range(i+1):
D[i,j] = cosine(v[i,:],v[j,:]) # scipy.spatial.distance.cosine()
return D
I was wondering which is the best way to add some parallelism to this routine. An easy way would be to break and assign the outer loop to a number of jobs, e.g. if you have 10 processors, create 10 different jobs for different ranges of i
and then concatenate the results. However this "horizontal" solution doesn't seem quite right. Are there any other parallel algorithms (or existing libraries) for this task? Any help would be highly appreciated.
Looks like scikit-learn
has a parallel version of pdist called pairwise_distances
from sklearn.metrics.pairwise import pairwise_distances
D = pairwise_distances(X = v, metric = 'cosine', n_jobs = -1)
where n_jobs = -1
specifies that all CPUs will be used.
See @agartland answer — you can specify n_jobs
in sklearn.metrics.pairwise.pairwise_distances or look for clustering algorithm at sklearn.cluster with n_jobs
parameter. E. g. sklearn.cluster.KMeans
.
Still, if you feeling adventurous, you can implement your own computation. For example, if you need 1D distance matrix for scipy.cluster.hierarchy.linkage
you can use:
#!/usr/bin/env python3
from multiprocessing import Pool
import numpy as np
from time import time as ts
data = np.zeros((100,10)) # YOUR data: np.array[n_samples x m_features]
n_processes = 4 # YOUR number of processors
def metric(a, b): # YOUR dist function
return np.sum(np.abs(a-b))
n = data.shape[0]
k_max = n * (n - 1) // 2 # maximum elements in 1D dist array
k_step = n ** 2 // 500 # ~500 bulks
dist = np.zeros(k_max) # resulting 1D dist array
def proc(start):
dist = []
k1 = start
k2 = min(start + k_step, k_max)
for k in range(k1, k2):
# get (i, j) for 2D distance matrix knowing (k) for 1D distance matrix
i = int(n - 2 - int(np.sqrt(-8 * k + 4 * n * (n - 1) - 7) / 2.0 - 0.5))
j = int(k + i + 1 - n * (n - 1) / 2 + (n - i) * ((n - i) - 1) / 2)
# store distance
a = data[i, :]
b = data[j, :]
d = metric(a, b)
dist.append(d)
return k1, k2, dist
ts_start = ts()
with Pool(n_processes) as pool:
for k1, k2, res in pool.imap_unordered(proc, range(0, k_max, k_step)):
dist[k1:k2] = res
print("{:.0f} minutes, {:,}..{:,} out of {:,}".format(
(ts() - ts_start)/60, k1, k2, k_max))
print("Elapsed %.0f minutes" % ((ts() - ts_start) / 60))
print("Saving...")
np.savez("dist.npz", dist=dist)
print("DONE")
Just so you know, scipy.cluster.hierarchy.linkage
implementation isn't parallel and its complexity is at least O(N*N). I'm not sure if scipy
has parallel implementation of this function.
I doubt you will get it any faster than pdist
in the scipy
module. Probably this is why it says
Note that you should avoid passing a reference to one of
the distance functions defined in this library. For example,:
dm = pdist(X, sokalsneath)
would calculate the pair-wise distances between the vectors in
X using the Python function sokalsneath. This would result in
sokalsneath being called n choose 2 times, which
is inefficient. Instead, the optimized C version is more
efficient, and we call it using the following syntax.:
dm = pdist(X, 'sokalsneath')
So no Python function is used, if you use pdist(X, 'cosine')
. When I run it, to me it seems, that it does only use one core, so if you have a lot of cores, you might get it faster. But bear in mind, that to achieve this, your native implementation has to be as fast as SciPy's. That won't be trivial. You'd rather be patient or go for a different clustering method, e. g. an algorithm which supports a spatial index.
In addition to what @agartland proposed I like to use pairwise_distances
or pairwise_disances_chunked
with numpy.triu_indices
to get the condensed distance vector. This being the exact output provided by scipy.spatial.distance.pdist
It is important to note the k
kwarg for triu_indices
controls the offset for the diagonal. The default value k=0
will return the diagonal of zeros as well as the real distance values and should be set to k=1
to avoid this.
For large datasets I have encountered an issue where pairwise_distances
raises a ValueError
from struct.unpack
when returning values from a worker thread. Thus my use of pairwise_distances_chunked
below.
gen = pairwise_distances_chunked(X, method='cosine', n_jobs=-1)
Z = np.concatenate(list(gen), axis=0)
Z_cond = Z[np.triu_indices(Z.shape[0], k=1)
For me this is much faster than using pdist
and scaled nicely with the number of available cores.
N.B. I think it is also worth pointing out that there was in the past some confusion about the arguments for scipy.cluster.hierarchy.linkage
in that the documentation at one point indicated that users could pass a condensed or squareform distance vector/matrix (linkage() function mistakes distance matrix as observation vectors #2614). This is in fact not the case and the values passed to linkage should be either the condensed distance vector or the m x n array of raw observations.
If you decide to orchestrate the multiprocessing by yourself you may want to split the number of calculations evenly between CPUs so that to maximally shorten the calculations. Then replies to this question on equally splitting the diagonal matrix may come handy.