Parallel construction of a distance matrix

2020-06-03 00:57发布

问题:

I work on hierarchical agglomerative clustering on large amounts of multidimensional vectors, and I noticed that the biggest bottleneck is the construction of the distance matrix. A naive implementation for this task is the following (here in Python):

''' v = an array (N,d), where rows are the observations
and columns the dimensions'''
def create_dist_matrix(v):
   N = v.shape[0]
   D = np.zeros((N,N))
   for i in range(N):
      for j in range(i+1):
          D[i,j] = cosine(v[i,:],v[j,:]) # scipy.spatial.distance.cosine()
   return D

I was wondering which is the best way to add some parallelism to this routine. An easy way would be to break and assign the outer loop to a number of jobs, e.g. if you have 10 processors, create 10 different jobs for different ranges of i and then concatenate the results. However this "horizontal" solution doesn't seem quite right. Are there any other parallel algorithms (or existing libraries) for this task? Any help would be highly appreciated.

回答1:

Looks like scikit-learn has a parallel version of pdist called pairwise_distances

from sklearn.metrics.pairwise import pairwise_distances

D = pairwise_distances(X = v, metric = 'cosine', n_jobs = -1)

where n_jobs = -1 specifies that all CPUs will be used.



回答2:

See @agartland answer — you can specify n_jobs in sklearn.metrics.pairwise.pairwise_distances or look for clustering algorithm at sklearn.cluster with n_jobs parameter. E. g. sklearn.cluster.KMeans.

Still, if you feeling adventurous, you can implement your own computation. For example, if you need 1D distance matrix for scipy.cluster.hierarchy.linkage you can use:

#!/usr/bin/env python3
from multiprocessing import Pool
import numpy as np
from time import time as ts


data = np.zeros((100,10)) # YOUR data: np.array[n_samples x m_features]
n_processes = 4           # YOUR number of processors
def metric(a, b):         # YOUR dist function
    return np.sum(np.abs(a-b)) 


n = data.shape[0]
k_max = n * (n - 1) // 2  # maximum elements in 1D dist array
k_step = n ** 2 // 500    # ~500 bulks
dist = np.zeros(k_max)    # resulting 1D dist array


def proc(start):
    dist = []
    k1 = start
    k2 = min(start + k_step, k_max)
    for k in range(k1, k2):
        # get (i, j) for 2D distance matrix knowing (k) for 1D distance matrix
        i = int(n - 2 - int(np.sqrt(-8 * k + 4 * n * (n - 1) - 7) / 2.0 - 0.5))
        j = int(k + i + 1 - n * (n - 1) / 2 + (n - i) * ((n - i) - 1) / 2)
        # store distance
        a = data[i, :]
        b = data[j, :]
        d = metric(a, b)
        dist.append(d)
    return k1, k2, dist


ts_start = ts()
with Pool(n_processes) as pool:
    for k1, k2, res in pool.imap_unordered(proc, range(0, k_max, k_step)):
        dist[k1:k2] = res
        print("{:.0f} minutes, {:,}..{:,} out of {:,}".format(
            (ts() - ts_start)/60, k1, k2, k_max))


print("Elapsed %.0f minutes" % ((ts() - ts_start) / 60))
print("Saving...")
np.savez("dist.npz", dist=dist)
print("DONE")

Just so you know, scipy.cluster.hierarchy.linkage implementation isn't parallel and its complexity is at least O(N*N). I'm not sure if scipy has parallel implementation of this function.



回答3:

I doubt you will get it any faster than pdist in the scipy module. Probably this is why it says

Note that you should avoid passing a reference to one of the distance functions defined in this library. For example,:

dm = pdist(X, sokalsneath)

would calculate the pair-wise distances between the vectors in X using the Python function sokalsneath. This would result in sokalsneath being called n choose 2 times, which is inefficient. Instead, the optimized C version is more efficient, and we call it using the following syntax.:

dm = pdist(X, 'sokalsneath')
So no Python function is used, if you use pdist(X, 'cosine'). When I run it, to me it seems, that it does only use one core, so if you have a lot of cores, you might get it faster. But bear in mind, that to achieve this, your native implementation has to be as fast as SciPy's. That won't be trivial. You'd rather be patient or go for a different clustering method, e. g. an algorithm which supports a spatial index.



回答4:

In addition to what @agartland proposed I like to use pairwise_distances or pairwise_disances_chunked with numpy.triu_indices to get the condensed distance vector. This being the exact output provided by scipy.spatial.distance.pdist

It is important to note the k kwarg for triu_indices controls the offset for the diagonal. The default value k=0 will return the diagonal of zeros as well as the real distance values and should be set to k=1 to avoid this.

For large datasets I have encountered an issue where pairwise_distances raises a ValueError from struct.unpack when returning values from a worker thread. Thus my use of pairwise_distances_chunked below.

gen = pairwise_distances_chunked(X, method='cosine', n_jobs=-1)
Z = np.concatenate(list(gen), axis=0)
Z_cond = Z[np.triu_indices(Z.shape[0], k=1)

For me this is much faster than using pdist and scaled nicely with the number of available cores.

N.B. I think it is also worth pointing out that there was in the past some confusion about the arguments for scipy.cluster.hierarchy.linkage in that the documentation at one point indicated that users could pass a condensed or squareform distance vector/matrix (linkage() function mistakes distance matrix as observation vectors #2614). This is in fact not the case and the values passed to linkage should be either the condensed distance vector or the m x n array of raw observations.



回答5:

If you decide to orchestrate the multiprocessing by yourself you may want to split the number of calculations evenly between CPUs so that to maximally shorten the calculations. Then replies to this question on equally splitting the diagonal matrix may come handy.