Fastest way to calculate minimum euclidean distanc

2019-03-16 09:40发布

I started a similar question on another thread, but then I was focusing on how to use OpenCV. Having failed to achieve what I originally wanted, I will ask here exactly what I want.

I have two matrices. Matrix a is 2782x128 and Matrix b is 4000x128, both unsigned char values. The values are stored in a single array. For each vector in a, I need the index of the vector in b with the closest euclidean distance.

Ok, now my code to achieve this:

#include <windows.h>
#include <stdlib.h>
#include <stdio.h>
#include <cstdio>
#include <math.h>
#include <time.h>
#include <sys/timeb.h>
#include <iostream>
#include <fstream>
#include "main.h"

using namespace std;

void main(int argc, char* argv[])
{
    int a_size;
    unsigned char* a = NULL;
    read_matrix(&a, a_size,"matrixa");
    int b_size;
    unsigned char* b = NULL;
    read_matrix(&b, b_size,"matrixb");

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    int* indexes = NULL;
    min_distance_loop(&indexes, b, b_size, a, a_size);

    QueryPerformanceCounter( &liEnd );

    cout << "loop time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    if (a)
    delete[]a;
if (b)
    delete[]b;
if (indexes)
    delete[]indexes;
    return;
}

void read_matrix(unsigned char** matrix, int& matrix_size, char* matrixPath)
{
    ofstream myfile;
    float f;
    FILE * pFile;
    pFile = fopen (matrixPath,"r");
    fscanf (pFile, "%d", &matrix_size);
    *matrix = new unsigned char[matrix_size*128];

    for (int i=0; i<matrix_size*128; ++i)
    {
        unsigned int matPtr;
        fscanf (pFile, "%u", &matPtr);
        matrix[i]=(unsigned char)matPtr;
    }
    fclose (pFile);
}

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    unsigned char* dataPtr;
    unsigned char* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a[dataIndex];
            vocPtr = &b[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

And attached are the files with sample matrices.

matrixa matrixb

I am using windows.h just to calculate the consuming time, so if you want to test the code in another platform than windows, just change windows.h header and change the way of calculating the consuming time.

This code in my computer is about 0.5 seconds. The problem is that I have another code in Matlab that makes this same thing in 0.05 seconds. In my experiments, I am receiving several matrices like matrix a every second, so 0.5 seconds is too much.

Now the matlab code to calculate this:

aa=sum(a.*a,2); bb=sum(b.*b,2); ab=a*b'; 
d = sqrt(abs(repmat(aa,[1 size(bb,1)]) + repmat(bb',[size(aa,1) 1]) - 2*ab));
[minz index]=min(d,[],2);

Ok. Matlab code is using that (x-a)^2 = x^2 + a^2 - 2ab.

So my next attempt was to do the same thing. I deleted my own code to make the same calculations, but It was 1.2 seconds approx.

Then, I tried to use different external libraries. The first attempt was Eigen:

const int descrSize = 128;
MatrixXi a(a_size, descrSize);
MatrixXi b(b_size, descrSize);
MatrixXi ab(a_size, b_size);

unsigned char* dataPtr = matrixa;
for (int i=0; i<nframes; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        a(i,j)=(int)*dataPtr++;
    }
}
unsigned char* vocPtr = matrixb;
for (int i=0; i<vocabulary_size; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        b(i,j)=(int)*vocPtr ++;
    }
}
ab = a*b.transpose();
a.cwiseProduct(a);
b.cwiseProduct(b);
MatrixXi aa = a.rowwise().sum();
MatrixXi bb = b.rowwise().sum();
MatrixXi d = (aa.replicate(1,vocabulary_size) + bb.transpose().replicate(nframes,1) - 2*ab).cwiseAbs2();

int* index = NULL;
index = (int*)malloc(nframes*sizeof(int));
for (int i=0; i<nframes; ++i)
{
    d.row(i).minCoeff(&index[i]);
}

This Eigen code costs 1.2 approx for just the line that says: ab = a*b.transpose();

A similar code using opencv was used also, and the cost of the ab = a*b.transpose(); was 0.65 seconds.

So, It is real annoying that matlab is able to do this same thing so quickly and I am not able in C++! Of course being able to run my experiment would be great, but I think the lack of knowledge is what really is annoying me. How can I achieve at least the same performance than in Matlab? Any kind of soluting is welcome. I mean, any external library (free if possible), loop unrolling things, template things, SSE intructions (I know they exist), cache things. As I said, my main purpose is increase my knowledge for being able to code thinks like this with a faster performance.

Thanks in advance

EDIT: more code suggested by David Hammen. I casted the arrays to int before making any calculations. Here is the code:

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    int* a_int;
    int* b_int;

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    a_int = (int*)malloc(a_size*descrSize*sizeof(int));
    b_int = (int*)malloc(b_size*descrSize*sizeof(int));

    for(int i=0; i<descrSize*a_size; ++i)
        a_int[i]=(int)a[i];
    for(int i=0; i<descrSize*b_size; ++i)
        b_int[i]=(int)b[i];

    QueryPerformanceCounter( &liEnd );

    cout << "Casting time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    /*unsigned char* dataPtr;
    unsigned char* vocPtr;*/
    int* dataPtr;
    int* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a_int[dataIndex];
            vocPtr = &b_int[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

The entire process is now 0.6, and the casting loops at the beginning are 0.001 seconds. Maybe I did something wrong?

EDIT2: Anything about Eigen? When I look for external libs they always talk about Eigen and their speed. I made something wrong? Here a simple code using Eigen that shows it is not so fast. Maybe I am missing some config or some flag, or ...

MatrixXd A = MatrixXd::Random(1000, 1000);
MatrixXd B = MatrixXd::Random(1000, 500);
MatrixXd X;

This code is about 0.9 seconds.

3条回答
Animai°情兽
2楼-- · 2019-03-16 10:27

As you observed, your code is dominated by the matrix product that represents about 2.8e9 arithmetic operations. Yopu say that Matlab (or rather the highly optimized MKL) computes it in about 0.05s. This represents a rate of 57 GFLOPS showing that it is not only using vectorization but also multi-threading. With Eigen, you can enable multi-threading by compiling with OpenMP enabled (-fopenmp with gcc). On my 5 years old computer (2.66Ghz Core2), using floats and 4 threads, your product takes about 0.053s, and 0.16s without OpenMP, so there must be something wrong with your compilation flags. To summary, to get the best of Eigen:

  • compile in 64bits mode
  • use floats (doubles are twice as slow owing to vectorization)
  • enable OpenMP
  • if your CPU has hyper-threading, then either disable it or define the OMP_NUM_THREADS environment variable to the number of physical cores (this is very important, otherwise the performance will be very bad!)
  • if you have other task running, it might be a good idea to reduce OMP_NUM_THREADS to nb_cores-1
  • use the most recent compiler that you can, GCC, clang and ICC are best, MSVC is usually slower.
查看更多
\"骚年 ilove
3楼-- · 2019-03-16 10:33

One thing that is definitely hurting you in your C++ code is that it has a boatload of char to int conversions. By boatload, I mean up to 2*2782*4000*128 char to int conversions. Those char to int conversions are slow, very slow.

You can reduce this to (2782+4000)*128 such conversions by allocating a pair of int arrays, one 2782*128 and the other 4000*128, to contain the cast-to-integer contents of your char* a and char* b arrays. Work with these int* arrays rather than your char* arrays.

Another problem might be your use of int versus long. I don't work on windows, so this might not be applicable. On the machines I work on, int is 32 bits and long is now 64 bits. 32 bits is more than enough because 255*255*128 < 256*256*128 = 223.

That obviously isn't the problem.

What's striking is that the code in question is not calculating that huge 2728 by 4000 array that the Matlab code is creating. What's even more striking is that Matlab is most likely doing this with doubles rather than ints -- and it's still beating the pants off the C/C++ code.

One big problem is cache. That 4000*128 array is far too big for level 1 cache, and you are iterating over that big array 2782 times. Your code is doing far too much waiting on memory. To overcome this problem, work with smaller chunks of the b array so that your code works with level 1 cache for as long as possible.

Another problem is the optimization if (distance>min_distance) break;. I suspect that this is actually a dis-optimization. Having if tests inside your innermost loop is oftentimes a bad idea. Blast through that inner product as fast as possible. Other than wasted computations, there is no harm in getting rid of this test. Sometimes it is better to make apparently unneeded computations if doing so can remove a branch in an innermost loop. This is one of those cases. You might be able to solve your problem just by eliminating this test. Try doing that.

Getting back to the cache problem, you need to get rid of this branch so that you can split the operations over the a and b matrix into smaller chunks, chunks of no more than 256 rows at a time. That's how many rows of 128 unsigned chars fit into one of the two modern Intel chip's L1 caches. Since 250 divides 4000, look into logically splitting that b matrix into 16 chunks. You may well want to form that big 2872 by 4000 array of inner products, but do so in small chunks. You can add that if (distance>min_distance) break; back in, but do so at a chunk level rather than at the byte by byte level.

You should be able to beat Matlab because it almost certainly is working with doubles, but you can work with unsigned chars and ints.

查看更多
成全新的幸福
4楼-- · 2019-03-16 10:41

Matrix multiply generally uses the worst possible cache access pattern for one of the two matrices, and the solution is to transpose one of the matrices and use a specialized multiply algorithm that works on data stored that way.

Your matrix already IS stored transposed. By transposing it into the normal order and then using a normal matrix multiply, your are absolutely killing performance.

Write your own matrix multiply loop that inverts the order of indices to the second matrix (which has the effect of transposing it, without actually moving anything around and breaking cache behavior). And pass your compiler whatever options it has for enabling auto-vectorization.

查看更多
登录 后发表回答