Fastest array-by-array nearest neighbour search

5

1

Intro - Nearest neighbour comparison between two arrays is something that can be done very efficiently when only 1 variable per array is involved, because it can be done by sorting and then performing a rolling comparison. However, for a 2 variable comparison this sort operation cannot be done anymore and another smart way has to be thought of. Hence the challenge:

Challenge - The challenge is to write a code that can, for each row in array A, find a row in array B such that (A[i,1]-B[j,1])^2+(A[i,2]-B[j,2])^2 is minimal and that then stores the corresponding row number j. The perfect minimum (0) is a perfect match of A[i,1] with B[j,1] and of A[i,2] with B[j,2].

As a code example I have a very clunky loop written in R

N=nrow(SetA)
for (i in 1:N){

      #calculate vector with distance of row i to the entire SetB
      vec=with(SetB,(SetA[i,x]-x)^2+(SetA[i,y]-y)^2)

      #calculate minimum of that vector and store the index
      indexVec[i,ind:=which.min(vec)]
}

Some sidenotes - If it matters (not sure it will) assume that both arrays have 10k rows. Ties can be decided at 'random' e.g. determined by processing sequence or whatever way is more convenient in your code.

Example in/out - For clarity, here is the sample input

SetA=[1 3;2 4;3 5;4 6;5 7]
SetB=[2 2.5; 4 3.1; 2 2.0; 3 3.0; 6 5.0;]

and the expected sample output

indexVec=[1;4;4;5;5]

where the values in indexVec indicate where the nearest neighbour of row i in SetA is located in SetB.

Fastest code (as tested on my i5-4210U) wins. The testset will consist of 2 arrays of 10k rows and 2 columns filled with floating point numbers between -1E10 and +1E10. Of course seeded with the same seed to remove any unfairness.

Feel free to use whatever language, but I am also very interested in an optimized version of the R code above.

Michiel

Posted 2015-02-12T07:19:51.487

Reputation: 151

Would you happen to have some sample input/output? I'm not quite sure I fully understand the problem – Sp3000 – 2015-02-12T07:22:33.747

@Sp3000 I have added the sample input and output (not in R syntax, but I guess you get what it means) – Michiel – 2015-02-12T07:31:08.940

@Michiel So basicly the arrays are 2 dimensional arrays? And the sum of both elements in a row of array A should be closest to the sum of both elements in a row of array B right? – Teun Pronk – 2015-02-12T08:20:38.363

@TeunPronk I have edited the question a bit to hopefully clarify this issue. Arrays are indeed 2 dimensional, but rather than finding the closest in sum of both elements I want the closest in distance as defined by the squared difference – Michiel – 2015-02-12T08:41:31.470

1So the input is two arrays of 2D points using a floating point type, and the task is to find, for each 2D point in the second array, the one in the first array which is nearest to it according to the L2 norm? How should ties be broken? – Peter Taylor – 2015-02-12T09:48:53.850

@PeterTaylor I didn't realize before that (mathematically) the L2 norm is the same thing, but you are absolutely right. Ties are allowed to be broken at random, i.e. whatever the script does. – Michiel – 2015-02-12T10:27:59.573

What kind of data will you test the submissions on? Uniformly random floats from some interval? – Zgarb – 2015-02-12T10:32:14.303

@Zgarb 2D arrays of 10k rows filled with randomly generated floating point numbers between -1E10 and +1E10 – Michiel – 2015-02-12T10:34:29.660

If I posted a C code, how would you pass in the random array values? should I fill them myself? – rorlork – 2015-02-12T11:18:42.313

@rcrmn I will use the same set as for different languages, loaded from file – Michiel – 2015-02-12T11:23:48.787

Answers

7

C++, .0007 seconds

Uses a simple grid to place all the points of B, then looks up the points of A starting at the grid point to which it maps.

#include <stdio.h>
#include <stdlib.h>
#include <thread>
#include <sys/time.h>

double mytime() {
  struct timeval tv;
  gettimeofday(&tv, NULL);
  return tv.tv_sec + tv.tv_usec * 1e-6;
}

typedef float vector[2];

#define T 4
// N must be a multiple of T
#define N 10000

// Width of grid.  Should be about sqrt(N)
#define W 100

vector a[N];
vector b[N];
int min[N];

float dist(vector x, vector y) {
    float a = x[0]-y[0];
    float b = x[1]-y[1];
    return a*a+b*b;
}

int mindist_test(vector x, vector *y, int n) {
    float best = dist(x, y[0]);
    int bestidx = 0;
    for(int i = 1; i < n; i++) {
        float d = dist(x, y[i]);
        if(best > d) {
            best = d;
            bestidx = i;
        }
    }
    return bestidx;
}

typedef struct cell cell;
struct cell {
    vector v;
    cell *next;
};

cell *grid[W][W];
cell cells[N];

float minx, miny, scalex, scaley, scale, scale2;

void build() {
    // Find bounds
    minx = b[0][0];
    miny = b[0][1];
    float maxx = b[0][0];
    float maxy = b[0][1];
    for(int i = 1; i < N; i++) {
        if(minx > b[i][0])
            minx = b[i][0];
        if(maxx < b[i][0])
            maxx = b[i][0];
        if(miny > b[i][1])
            miny = b[i][1];
        if(maxy < b[i][1])
            maxy = b[i][1];
    }
    scalex = (maxx - minx + .00001) / W;
    scaley = (maxy - miny + .00001) / W;
    scale = scalex < scaley ? scalex : scaley;
    scale2 = scale * scale;

    // Place B vectors in grid
    for(int i = 0; i < N; i++) {
        cells[i].v[0] = b[i][0];
        cells[i].v[1] = b[i][1];
        int x = int((b[i][0] - minx) / scalex);
        int y = int((b[i][1] - miny) / scaley);
        cells[i].next = grid[x][y];
        grid[x][y] = &cells[i];
    }
}

int find(vector v) {
    // Start checking at grid[x][y]
    int x = int((v[0] - minx) / scalex);
    int y = int((v[1] - miny) / scaley);

    float mindist = 1e30;
    cell *mincell;

    // Look in grid cells within radius r
    for(int r = 1; r < W; r++) {
        for(int i = x-r; i <= x+r; i++) {
            if(i < 0 || i >= W) continue;
            for(int j = y-r; j <= y+r; j++) {
                if(j < 0 || j >= W) continue;
                for(cell *c = grid[i][j]; c != NULL; c = c->next) {
                    float d = dist(v, c->v);
                    if(mindist > d) {
                        mindist = d;
                        mincell = c;
                    }
                }
            }
        }
        // If we've found a point close enough, stop.
        // The next iterations' points are guaranteed to be at least distance r*scale away.
        if(mindist < (r*r)*scale2)
            break;
    }
    return mincell - cells;
}

void thr(int t) {
    int base = N/T*t;
    for(int i = 0; i < N/T; i++) {
        min[base+i] = find(a[base+i]);
    }
}

int main(int argc, char *argv[]) {
    // Initialize
    for(int i = 0; i < N; i++) {
        a[i][0] = rand() / (RAND_MAX/1000000.);
        a[i][1] = rand() / (RAND_MAX/1000000.);
        b[i][0] = rand() / (RAND_MAX/1000000.);
        b[i][1] = rand() / (RAND_MAX/1000000.);
    }

    // Run & time
    double start = mytime();
    build();
    std::thread* threads[T];
    for(int t = 0; t < T; t++) {
        threads[t] = new std::thread(thr, t);
    }
    for(int t = 0; t < T; t++) {
        threads[t]->join();
    }
    double stop = mytime();
    printf("time: %f\n", stop - start);

    // Check results
    for(int i = 0; i < N; i++) {
        int j = min[i];
        int k = mindist_test(a[i], b, N);
        if(dist(a[i], b[j]) != dist(a[i], b[k])) {
            printf("BAD %d: %d %f %f\n", i, j, dist(a[i], b[j]), dist(a[i], b[k]));
        }
    }
}

Compile with

g++ -O3 -std=c++11 -stdlib=libc++ grid.cc -o grid

Keith Randall

Posted 2015-02-12T07:19:51.487

Reputation: 19 865

5

Mathematica 0.035230 secs

Update

The latest version required

  • 0.035230 sec for 10k rows of data
  • 0.419461 sec for 100k rows
  • 4.975199 sec for 1 million rows

nf = Nearest[Thread[B -> Range[n]]]

The nearest function, nf, delivers a 100 fold speedup over the prior approach. This is due to the creation of a lookup table for positions:

r[m_]:= RandomReal[{-E^10,E^10},{m,2}]
n = 10000;
A = r[n];
B = r[n];
(nf = Nearest[Thread[B -> Range[n]]]; 
 Flatten@(nf[#, 1][[1]] & /@ A);) // AbsoluteTiming

{0.035230, Null}


Example

n = 10;
A = r[n]
B = r[n]
(nf = Nearest[Thread[B -> Range[n]]]; Flatten@(nf[#, 1][[1]] & /@ A))

(*A *)
{{3944.81, 10711.}, {-12978.5, -7807.64}, {-7309.13, 15.5648}, {-2695.33, 6309.87}, {14819.7, 21796.6}, {20681.7, -296.336}, {-7396.73, -9588.16}, {73.5495, -7730.14}, {-20361.8, -4616.41}, {21576.4, -18024.}}

(*B *)
{{21382., -597.126}, {5872.82, 6305.16}, {-10502.4,2378.9}, {-5643.86, 19461.4}, {-4986.97, -2173.96}, {10024.5, 14230.6}, {6747.26, 6242.93}, {15714.1, -18345.4}, {-21971., -5052.04}, {-17088.2, 3853.11}}

(*Output *)
{2, 9, 5, 2, 6, 1, 5, 5, 9, 8}


I'm including 2 earlier approaches to highlight the improvements in speed due to seemingly minor tweaks.

Prior approach

This takes about 3.5 seconds for arrays of {10^4,2},

This makes it 10 times slower than the above approach, but 10 times faster than my first, naive, implementation.

The key improvement was to look for positions through one call. It also uses the built-in Nearest function (but doesn't exploit it nearly as well as the current version).

r[m_]:= RandomReal[{-E^10,E^10},{m,2}]
n=10000;
A=r[n];
B=r[n];
nf = Nearest[B];
Flatten@Position[B, #] & /@ (nf[#, 1][[1]] & /@ A); // AbsoluteTiming

{3.554101, Null}


First attempt

This unoptimized version takes 40 seconds for 10k rows.

r:= RandomReal[{-E^10,E^10},{10^4,2}]
nearness[{r1_,c1_},array_]:=Total[({r1,c1}-#)^2]&/@array
nearestPosition[aArray_,bArray_]:=Position[z=nearness[#,bArray],Min[z]][[1,1]]&/@aArray

Test

A = r
B = r
nearestPosition[A, B]; // AbsoluteTiming

{40.141484, Null}

DavidC

Posted 2015-02-12T07:19:51.487

Reputation: 24 524

4

Python 3

from scipy import spatial
import numpy as np

def nearest_neighbour(points_a, points_b):
    tree = spatial.cKDTree(points_b)
    return tree.query(points_a)[1]

0.05 seconds for 10k rows of data, 0.7 seconds for 100k rows, and 7.6 seconds for a million rows.

Originally I wanted to implement the approach myself, but why do so, when NumPy and SciPy offer everything you want.

The idea is to build a KD-tree of the points b. Using this KD-tree you can effectively search for the nearest points, since you can restrict the search space quite a bit. I'm not completely sure about the implementation of SciPy, but you can check out one algorithm at Wikipedia.

The complexity of this should be about O(N*log(N)) for computing the KD-tree (N is the number of entries) and O(log(N)) for a single search on average. If N = #a = #b, the total complexity is O(N*log(N)), which is a lot better than the brute force approach with Theta(N^2)

Tests

The output of example in Michiel's post is [0 3 3 4 4] (Off by one because R).

a = np.array([[1, 3],[2, 4],[3, 5],[4, 6],[5, 7]])
b = np.array([[2, 2.5],[4, 3.1],[2, 2.0],[3, 3.0],[6, 5.0]])
print(nearest_neighbour(a, b))

Testing it for n = 10k:

import time
n = 10**4
points_a = np.random.rand(n, 2)
points_b = np.random.rand(n, 2)

start_time = time.time()
result = nearest_neighbour(points_a, points_b)
end_time = time.time()
print("n = {:7d}, {:.3f} seconds".format(n, end_time - start_time))

You can view the complete program at Gist.

Jakube

Posted 2015-02-12T07:19:51.487

Reputation: 21 462

Upvoted for: Originally I wanted to implement the approach myself, but why do so, when NumPy and SciPy offer everything you want. – johnchen902 – 2015-02-12T18:19:18.000

3

R

You can vectorize your operations for a bit more speed. If you need something faster, move to a different language lol.

# Original data
SetA = matrix(c(1, 3, 2, 4, 3, 5, 4, 6, 5, 7), nrow = 2)
SetB = matrix(c(2, 2.5, 4, 3.1, 2, 2.0, 3, 3.0, 6, 5.0), nrow = 2)

# 10k random sample data
SetA = matrix(sample(1:20, 20000, replace = T), nrow = 2)
SetB = matrix(sample(1:20, 20000, replace = T), nrow = 2)

apply(X = SetA, MARGIN = 2, FUN = function(z) which.min((SetB[1,]-z[1])^2 + (SetB[2,]-z[2])^2))

Takes me ~2.6s to run 10k points.

edit:

Vectorized it a bit more. This version is another 0.3s faster than the solution above.

apply(X = SetA, MARGIN = 2, FUN = function(z) {which.min(colSums((z-SetB)^2))})

Vlo

Posted 2015-02-12T07:19:51.487

Reputation: 806

1

C++ with AVX, brute force, 0.013 seconds

10K isn't that big, right? Just plow through the O(n^2) algorithm using AVX instructions to do it really fast. I'm using single-precision floating point, I hope that is ok.

Update: now with threads!

#include <stdio.h>
#include <immintrin.h>
#include <thread>
#include <sys/time.h>

double mytime() {
  struct timeval tv;
  gettimeofday(&tv, NULL);
  return tv.tv_sec + tv.tv_usec * 1e-6;
}

typedef float vector[2];

// for each 0<=i<4, finds y[j] closest to x[i] 0<=j<n.
// puts the resulting j's in r[0..3].
void mindist(vector *xp, vector *yp, int n, int *r) {
    // Load 4 vectors from x
    __m256 x = _mm256_load_ps(&xp[0][0]);

    // Best distance & its index found so far, for the 4 x vectors.
    // Packed strangely in the 8 32-bit-float-sized slots.
    // 0: dist0
    // 1: dist1
    // 2: idx0
    // 3: idx1
    // 4: dist2
    // 5: dist3
    // 6: idx2
    // 7: idx3
    __m256 best = _mm256_set_ps(0, 0, 1e30, 1e30, 0, 0, 1e30, 1e30);

    // Indexes are floats, integer addition doesn't come until AVX2
    __m256 idx = _mm256_set_ps(3,2,0,0,1,0,0,0);
    __m256 idxinc = _mm256_set_ps(4,4,0,0,4,4,0,0);

    // Process 4 y vectors at a time
    for(int i = 0; i < n; i += 4) {
        __m256 y = _mm256_load_ps(&yp[i][0]);

        for(int j = 0; j < 4; j++) {
            // Compute difference of vectors
            __m256 diff = _mm256_sub_ps(x, y);

            // Square each component
            __m256 square = _mm256_mul_ps(diff, diff);

            // Add pairs.  Results are in floats 0,1,4,5
            __m256 sum = _mm256_hadd_ps(square, square);
            // Set remaining slots using indexes
            sum = _mm256_blend_ps(sum, idx, 0b11001100);

            // Compare with best distance so far
            __m256 cmp = _mm256_cmp_ps(sum, best, _CMP_LT_OQ);
            // Duplicate comparison masks to index slots
            cmp = _mm256_permute_ps(cmp, 0b01000100);

            // Take new distance/index if it is smaller
            best = _mm256_blendv_ps(best, sum, cmp);

            // Shuffle b & idx vectors around.  This trickery
            // puts all 4 b vectors in all 4 slots.
            if(j == 0 || j == 2) {
                // abcd -> cdab
                y = _mm256_permute2f128_pd(y, y, 0x01);
                idx = _mm256_permute2f128_pd(idx, idx, 0x01);
            } else {
                // abcd -> badc
                y = _mm256_permute_pd(y, 0b0101);
                idx = _mm256_permute_ps(idx, 0b10110000);
            }
        }
        idx = _mm256_add_ps(idx, idxinc);
    }
    // Extract results
    best = _mm256_cvtps_epi32(best);
    r[0] = _mm256_extract_epi32(best, 2);
    r[1] = _mm256_extract_epi32(best, 3);
    r[2] = _mm256_extract_epi32(best, 6);
    r[3] = _mm256_extract_epi32(best, 7);
}

// number of threads
#define T 4

// N must be a multiple of 4*T
#define N 10000

vector a[N] __attribute__((aligned(32)));
vector b[N] __attribute__((aligned(32)));
int min[N];

float dist(vector x, vector y) {
    float a = x[0]-y[0];
    float b = x[1]-y[1];
    return a*a+b*b;
}

int mindist_test(vector x, vector *y, int n) {
    float best = dist(x, y[0]);
    int bestidx = 0;
    for(int i = 1; i < n; i++) {
        float d = dist(x, y[i]);
        if(best > d) {
            best = d;
            bestidx = i;
        }
    }
    return bestidx;
}

void thr(int t) {
    for(int i = 0; i < N/T; i += 4) {
        mindist(&a[N/T*t+i], b, N, &min[N/T*t+i]);
    }
}

int main(int argc, char *argv[]) {
    // Initialize
    for(int i = 0; i < N; i++) {
        a[i][0] = rand() / (RAND_MAX/1000000.);
        a[i][1] = rand() / (RAND_MAX/1000000.);
        b[i][0] = rand() / (RAND_MAX/1000000.);
        b[i][1] = rand() / (RAND_MAX/1000000.);
    }

    // Run & time
    double start = mytime();
    std::thread* threads[T];
    for(int t = 0; t < T; t++) {
        threads[t] = new std::thread(thr, t);
    }
    for(int t = 0; t < T; t++) {
        threads[t]->join();
    }
    double stop = mytime();
    printf("time: %f\n", stop - start);

    // Check results
    for(int i = 0; i < N; i++) {
        int j = min[i];
        int k = mindist_test(a[i], b, N);
        if(dist(a[i], b[j]) != dist(a[i], b[k])) {
            printf("BAD %d: %d %f %f\n", i, j, dist(a[i], b[j]), dist(a[i], b[k]));
        }
    }
}

Compiled on my mac with

clang++ -O3 -mavx -std=c++11 -stdlib=libc++ avx4.cc -o avx4

Keith Randall

Posted 2015-02-12T07:19:51.487

Reputation: 19 865

0

C++11

(Naive approach with O(N^2) complexity. With a kd-tree it could probably be improved a lot. I'll work on it in the future)

Okay, this C++ code should work a lot better than the matlab code posted below. In my computer it runs in ~0.5s for the 10k values. I've compiled it in VS, but it should work in linux, although I don't know the command line code to compile it properly, I guess it will need -std=c++11 and maybe -pthread for the threading.

As for the loading from file, I would need the format of the loaded file to prepare it for you. As it is right now, it creates a rng and fills the two arrays with random floats.

#include <chrono>
#include <iostream>
#include <limits>
#include <thread>
#include <random>

#define ROWS 10000
#define sqdiff(a,b) (((b)-(a))*((b)-(a)))
void NN(float a[][2], float b[][2], int c[], int length)
{
    for (int i = 0; i < length; ++i)
    {
        int k = 0;
        float m = std::numeric_limits<float>::infinity();
        float x = a[i][0];
        float y = a[i][1];
        for (int j = 0; j < ROWS; j += 4)
        {
            {
                float d = sqdiff(x, b[j][0]) + sqdiff(y, b[j][1]);
                if (d < m) k = j;
            }
            {
                float d = sqdiff(x, b[j+1][0]) + sqdiff(y, b[j+1][1]);
                if (d < m) k = j;
            }
            {
                float d = sqdiff(x, b[j+2][0]) + sqdiff(y, b[j+2][1]);
                if (d < m) k = j;
            }
            {
                float d = sqdiff(x, b[j+3][0]) + sqdiff(y, b[j+3][1]);
                if (d < m) k = j;
            }
        }
        c[i] = k;
    }
}

void InitializeRandomArray(float* a, int length)
{
    std::default_random_engine generator;
    std::uniform_real_distribution<float> distribution;

    for (int i = 0; i < length; ++i)
    {
        a[i] = distribution(generator);
    }
}

int main(int argc, char** argv)
{
    float a[ROWS][2];
    float b[ROWS][2];
    int c[ROWS];

    std::thread* threads[4];

    // Initialization
    InitializeRandomArray((float*)a, ROWS*2);
    InitializeRandomArray((float*)b, ROWS*2);


    // Time counter
    std::chrono::time_point<std::chrono::system_clock> start, end;
    start = std::chrono::system_clock::now();

    // Computation
    for (int t = 0; t < 4; ++t)
    {
        threads[t] = new std::thread(NN, &(a[ROWS/4*t]), b, &(c[ROWS/4*t]), ROWS/4);
    }

    // Thread joining
    for (int t = 0; t < 4; ++t)
    {
        threads[t]->join();
        delete threads[t];
    }

    // Output of time used
    end = std::chrono::system_clock::now();
    std::chrono::duration<double> elapsed_seconds = end - start;
    std::cout << "elapsed time: " << elapsed_seconds.count() << "s" << std::endl;

    return 0;
}

Matlab

This version runs as slow as a turtle, it took about 6s for 1k arrays in octave. In matlab i believe it may run faster, but can't check it right now as I don't have it installed in this computer.

function [c] = NN(a, b)
    c = zeros(size(a,1),1);

    for i=1:size(a,1)
        [_,c(i)] = min(sum((b-a(i,:)).^2, 2));
    end
end

I'm sure this can be made very optimized in C but I lack the time to implement it right now.

rorlork

Posted 2015-02-12T07:19:51.487

Reputation: 1 421

Good luck to your kd-tree plans. But keep in mind, that my Python 3 solution uses compiled C-code in the background. SciPy is really optimized, so I would be really surprised, if you can achieve faster times (single threaded at least). – Jakube – 2015-02-12T17:05:25.617

Well, I already guessed so, but I would try using multithreaded code. Also, by using an interpreted code I guess there will be some overhead. But if going with the O(N^2) approach I have a timing similar to yours, I believe it could be improved, even if it's only from the multithreading. – rorlork – 2015-02-12T17:21:38.613

1The overhead will be quite small though. The entries in the created NumPy arrays have a fixed type (float) and the SciPy is only a simply wrapper around C-code. If you try coding it with multithreading, there are a few libaries, that can create a kd-tree multithreaded. And maybe already can perform nearest neighbour queries. – Jakube – 2015-02-12T17:33:16.873

0

R

N=nrow(SetA)
for (i in 1:N){
      vec=with(SetB,(SetA[i,x]-x)^2+(SetA[i,y]-y)^2)

      indexVec[i,ind:=which.min(vec)]
}

18 seconds for 10k rows

I am quite sure that it should be possible to make this much more efficient

Michiel

Posted 2015-02-12T07:19:51.487

Reputation: 151