How to optimize memory access pattern / cache misses for this array decimate/downsample program?
Asked Answered
F

5

9

I was recently asked about a piece of code to decimate/downsample the array "in-place". This "decimation" function takes an array of ints and stores an entry at an even index i in the array at the index i/2. It does it for all entries in the array.

This would move all even indexed entries in the original array to the first-half of the array. The rest of the array can then be initialized to 0. The overall result is an array that preserved all even index entries in the original array (by moving them to the first-half) and the second-half of the array being 0. This is apparently used to downsample signals in signal processing.

The code looks something like this:

void decimate (vector<int>& a) {
   int sz = a.size();
   for (int i =0; i < sz; i++) {
     if (i%2 == 0) {
        a[i/2] = a[i];
     }
    }
    for (int i =(sz-1)/2; i < sz; i++) a[i] = 0;
}

After suggesting basic improvements that keep certain variables in registers, I can't find any further way to optimize it but not sure if that can't be done.

Are there ways one could optimize the memory access pattern in the loop for better cache performance? Or any other ways to optimize the main copy operations of compressing/down-sampling the array into the first-half ? (e.g. by vectorization for platforms that support it)

   for (int i =0; i < sz; i++) {
     if (i%2 == 0) {
        a[i/2] = a[i];
     }
    }

Are there any loop transformations (such as tiling/strip-mining) that can lead to highly efficient code for such decimate loop?

EDIT: There are a few different ways suggested in the answers below that seem to take advantage of memset/fill or pointer arithmetic to gain speed efficiency. This question is main focused on whether there are well-defined Loop Transformations that can significantly improve locality or cache misses (e.g. if it were a loop-nest with two loops, one could potentially look into loop tiling to optimize cache misses)

Ferreira answered 10/9, 2017 at 8:17 Comment(0)
N
4

You have an array like this:

0 1 2 3 4 5 6 7 8 9

You want to end up with this:

0 2 4 6 8 0 0 0 0 0

I'd do it this way:

void decimate (vector<int>& a) {
  size_t slow = 1, fast = 2;

  // read the first half, write the first quarter
  size_t stop = (a.size()+1)/2;
  while (fast < stop) {
    a[slow++] = a[fast];
    fast += 2;
  }

  // read and clear the second half, write the second quarter
  stop = a.size();
  while (fast < stop) {
    a[slow++] = a[fast];
    a[fast++] = 0;
    a[fast++] = 0;
  }

  // clean up (only really needed when length is even)
  a[slow] = 0;
}

On my system, this is roughly 20% faster than your original version.

Now it's up to you to test and let us know if it's faster on your system!

Nobile answered 10/9, 2017 at 9:6 Comment(3)
You should check for empty vector before decimating, unless it crashes.Legit
Made an edit to the q to know if there are Loop transformations that apply.Ferreira
@JoeBlack: If you look carefully, I did transform the loop. I split it into two halves, so that each half does exactly the same thing on all elements with no internal branching, and I try to pass over each element the minimum number of times.Nobile
O
3

Here is a version using pointer arithmetic and placement new which uses the fact that std::vector uses a continuous memory layout internally:

void down_sample(std::vector<int> & v){ 
    int * begin = &v[0];
    int * stop =  begin + v.size();
    int * position = begin + 2;
    int * half_position = begin +1;
    while( position < stop){
        *half_position = *position;
        ++half_position;
        position += 2;
    }
    size_t size = v.size()/2;
    int * a = new (half_position) int[size]();
}

On my machine this code runs 3 times as fast as yours with disabled optimizations and about 30 % faster than your version when compiled with -o3 on gcc7.2. I tested this with an vector size of 20 000 000 elements.

And I think that in your version line:

for (int i =(sz-1)/2; i < sz; i++) a[i] = 0;

should be

for (int i =(sz-1)/2 + 1; i < sz; i++) a[i] = 0;

otherwise there will be set too many elements to zero.

Taking into account John Zwinck's question I did some quick test with memset and std::fill instead of placement new.

Here are the results:

n = 20000000
compiled with -o0
orginal 0.111396 seconds
mine    0.0327938 seconds
memset  0.0303007 seconds
fill    0.0507268 seconds

compiled with -o3
orginal 0.0181994 seconds
mine    0.014135 seconds
memset  0.0141561 seconds
fill    0.0138893 seconds

n = 2000
compiled with -o0
orginal 3.0119e-05 seconds
mine    9.171e-06 seconds
memset  9.612e-06 seconds
fill    1.3868e-05 seconds

compiled with -o3
orginal 5.404e-06 seconds
mine    2.105e-06 seconds
memset  2.04e-06 seconds
fill    1.955e-06 seconds

n= 500000000 (with -o3)
mine=     0,350732
memeset = 0.349054  
fill =    0.352398

It seems that memset is a little bit faster on large vectors and std::fill a little bit faster on smaller vectors. But the difference is very small.

Overcompensation answered 10/9, 2017 at 13:2 Comment(8)
This version is even faster then @JohnZwink 's one. I think you should use stop = begin + v.size() instead of taking address of element out of vector's range.Legit
Is int * a = new (half_position) int[size]() somehow better than memset(half_position, 0, size * sizeof(int) or std::fill(half_position, half_position + size, 0)?Nobile
I didn't take into account this two methods at first.Overcompensation
n = 20000000 with -o0 orginal 0.111396 seconds mine 0.0327938 seconds memset 0.0303007 seconds fill 0.0507268 seconds with -o3 orginal 0.0181994 seconds mine 0.014135 seconds memset 0.0141561 seconds fill 0.0138893 seconds n = 2000 with -o0 orginal 3.0119e-05 seconds mine 9.171e-06 seconds memset 9.612e-06 seconds fill 1.3868e-05 seconds with -o3 orginal 5.404e-06 seconds mine 2.105e-06 seconds memset 2.04e-06 seconds fill 1.955e-06 secondsOvercompensation
It seems that they have nearly the same execution time at least when compiled with optimizations enabled. (sorry for the ugly format. Somehow new line characters are ignored in comments. Maybe I should create a new answer for this?)Overcompensation
You can edit this answer and add new information. You don't need to post a new one. But I see you've already discovered that. :-)Renell
Yes, the begin index for the second loop should be i =(sz-1)/2 + 1. This is certainly faster (i had actually suggested to who asked me this q to use the std::fill_n to 0 the second half), but i was specifically interested in whether any loop transformations exist (in the literature) that can optimize any cache misses or memory accesses (Also updated the Question to reflect it)Ferreira
Removing conditionalpath from loops, should help avoiding cache misses for the instruction cache.Overcompensation
L
1

My version of one pass decimate():

void decimate (std::vector<int>& a) {
    const std::size_t sz = a.size();
    const std::size_t half = sz / 2;

    bool size_even = ((sz % 2) == 0);

    std::size_t index = 2;
    for (; index < half; index += 2) {
        a[index/2] = a[index];
    }

    for (; index < sz; ++index) {
        a[(index+1)/2] = a[index];
        a[index] = 0;
    }

    if (size_even && (half < sz)) {
        a[half] = 0;
    }
}

and tests for it:

#include <vector>
#include <iostream>
#include <cstddef>

void decimate(std::vector<int> &v);

void print(std::vector<int> &a) {
    std::cout << "{";
    bool f = false;

    for(auto i:a) {
        if (f) std::cout << ", ";
        std::cout << i;
        f = true;
    }
    std::cout << "}" << std::endl;
}

void test(std::vector<int> v1, std::vector<int> v2) {
    auto v = v1;
    decimate(v1);

    bool ok = true;

    for(std::size_t i = 0; i < v1.size(); ++i) {
        ok = (ok && (v1[i] == v2[i]));
    }

    if (ok) {
        print(v);
        print(v1);
    } else {
        print(v);
        print(v1);
        print(v2);
    }
    std::cout << "--------- " << (ok?"ok":"fail") << "\n" << std::endl;
}

int main(int, char**)
{
    test({},
        {});

    test({1},
        {1});

    test({1, 2},
        {1, 0});

    test({1, 2, 3},
        {1, 3, 0});

    test({1, 2, 3, 4},
        {1, 3, 0, 0});

    test({1, 2, 3, 4, 5},
        {1, 3, 5, 0, 0});

    test({1, 2, 3, 4, 5, 6},
        {1, 3, 5, 0, 0, 0});

    test({1, 2, 3, 4, 5, 6, 7},
        {1, 3, 5, 7, 0, 0, 0});

    test({1, 2, 3, 4, 5, 6, 7, 8},
        {1, 3, 5, 7, 0, 0, 0, 0});

    test({1, 2, 3, 4, 5, 6, 7, 8, 9},
        {1, 3, 5, 7, 9, 0, 0, 0, 0});

    test({1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
        {1, 3, 5, 7, 9, 0, 0, 0, 0, 0});

    test({1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
        {1, 3, 5, 7, 9, 11, 0, 0, 0, 0, 0});

    return 0;
}
Legit answered 10/9, 2017 at 8:43 Comment(4)
On my system this is about the same speed as the original. What about on yours?Nobile
On my system my version takes about 72% of original version time, and yours takes about 59% of original version time.Legit
And the @Hatatister's version takes only 46% time on 500 000 000 elements vector.Legit
Edited the q to reflect the focus.Ferreira
G
0

Don't go up to sz, if you set it to zero afterwards.

If sz is even goto sz/2, if not to (sz-1)/2.

for (int i =0; i < sz_half; i++) 
        a[i] = a[2*i];
Ginkgo answered 10/9, 2017 at 9:43 Comment(1)
Yes, this should be fine, but i don't expect significant speedup due to it as we are skipping the copy when i is odd, i.e. the costly operations continue to exist (except for the if conditional inside the loop which compiler will likely optimize away).Ferreira
G
0

I compared all the answers given here. I used the intel compiler icc version 15.0.3. Optimization level O3 was used.

Orig: Time difference [micro s] = 79506
JohnZwinck: Time difference [micro s] = 69127   
Hatatister: Time difference [micro s] = 79838
user2807083: Time difference [micro s] = 80000
Schorsch312: Time difference [micro s] = 84491

All times refer to a vector with length 100000000.

#include <vector>
#include <cstddef>
#include <iostream>
#include <chrono>

const int MAX = 100000000;

void setup(std::vector<int> & v){
    for (int i = 0 ; i< MAX; i++) {
        v.push_back(i);
    }
}


void checkResult(std::vector<int> & v) {
    int half_length;
    if (MAX%2==0)
        half_length = MAX/2;
    else
        half_length = MAX-1/2;

    for (int i = 0 ; i< half_length; i++) {
        if (v[i] != i*2)
            std::cout << "Error: v[i]="  << v[i] << " but should be "  <<     2*i <<  "\n";
    }

    for (int i = half_length+1; i< MAX; i++) {
        if (v[i] != 0)
            std::cout << "Error: v[i]="  << v[i] << " but should be 0 \n";
    }
}

void down_sample(){
    std::vector<int> v;
    setup(v);

    auto start_time = std::chrono::steady_clock::now();

    int * begin = &v[0];
    int * stop =  begin + v.size();
    int * position = begin + 2;
    int * half_position = begin +1;
    while( position < stop){
        *half_position = *position;
        ++half_position;
        position += 2;
    }
    size_t size = v.size()/2;
    int * a = new (half_position) int[size]();

    auto duration = std::chrono::steady_clock::now() - start_time;
    std::cout << "Orig: Time difference [micro s] = " << std::chrono::duration_cast<std::chrono::microseconds>(duration).count() <<std::endl;
    checkResult(v);
}

void down_sample_JohnZwinck () {
    std::vector<int> v;
    setup(v);

    auto start_time = std::chrono::steady_clock::now();

    size_t slow = 1, fast = 2;

    // read the first half, write the first quarter
    size_t stop = (v.size()+1)/2;
    while (fast < stop) {
        v[slow++] = v[fast];
        fast += 2;
    }

    // read and clear the second half, write the second quarter
    stop = v.size();
    while (fast < stop) {
        v[slow++] = v[fast];
        v[fast++] = 0;
        v[fast++] = 0;
    }

    // clean up (only really needed when length is even)
    v[slow] = 0;

    auto duration = std::chrono::steady_clock::now() - start_time;
    std::cout << "JohnZwinck: Time difference [micro s] = " << std::chrono::duration_cast<std::chrono::microseconds>(duration).count() <<std::endl;
    checkResult(v);

}

void down_sample_Schorsch312(){ 
    std::vector<int> v;
    setup(v);

    auto start_time = std::chrono::steady_clock::now();
    int half_length;

    if (v.size()%2==0)
        half_length = MAX/2;
    else
        half_length = MAX-1/2;

    for (int i=0; i < half_length; i++) 
        v[i] = v[2*i];
    for (int i=half_length+1; i< MAX; i++) 
        v[i]=0;

    auto duration = std::chrono::steady_clock::now() - start_time;
std::cout << "Schorsch312: Time difference [micro s] = " << std::chrono::duration_cast<std::chrono::microseconds>(duration).count() <<std::endl;
}

void down_sample_Hatatister(){ 
    std::vector<int> v;
    setup(v);

    auto start_time = std::chrono::steady_clock::now();

    int * begin = &v[0];
    int * stop =  begin + v.size();
    int * position = begin + 2;
    int * half_position = begin +1;

    while( position < stop){
        *half_position = *position;
        ++half_position;
        position += 2;
    }
    size_t size = v.size()/2;
    int * a = new (half_position) int[size]();
    auto duration = std::chrono::steady_clock::now() - start_time;
    std::cout << "Hatatister: Time difference [micro s] = " << std::chrono::duration_cast<std::chrono::microseconds>(duration).count() <<std::endl;

    checkResult(v);
}

void down_sample_user2807083 () {
    std::vector<int> v;
    setup(v);

    auto start_time = std::chrono::steady_clock::now();

    const std::size_t sz = v.size();
    const std::size_t half = sz / 2;

    bool size_even = ((sz % 2) == 0);

    std::size_t index = 2;

    for (; index < half; index += 2) {
        v[index/2] = v[index];
    }

    for (; index < sz; ++index) {
        v[(index+1)/2] = v[index];
        v[index] = 0;
    }

    if (size_even && (half < sz)) {
        v[half] = 0;
    }
    auto duration = std::chrono::steady_clock::now() - start_time;
    std::cout << "user2807083: Time difference [micro s] = " << std::chrono::duration_cast<std::chrono::microseconds>(duration).count() <<std::endl;

    checkResult(v);

}

int main () {
    down_sample();
    down_sample_JohnZwinck ();
    down_sample_Schorsch312();
    down_sample_Hatatister();
    down_sample_user2807083();
}
Ginkgo answered 11/9, 2017 at 8:17 Comment(1)
Interesting. This shows, that performance optimisation can be hard. And performance gains can be unrelated to your code and can be caused by other effects. Even something like defining an environment variable can have an impact. There was an interesting CppCon Talk on this topic, but I cannot find the link anymore.Overcompensation

© 2022 - 2024 — McMap. All rights reserved.