Sparse array compression using SIMD (AVX2)
Asked Answered
H

3

8

I have a sparse array a (mostly zeroes):

unsigned char a[1000000]; 

and I would like to create an array b of indexes to non-zero elements of a using SIMD instructions on Intel x64 architecture with AVX2. I'm looking for tips how to do it efficiently. Specifically, are there SIMD instruction(s) to get positions of consecutive non-zero elements in SIMD register, arranged contiguously?

Hydropic answered 24/9, 2013 at 1:9 Comment(6)
Not directly, but you could pcmpeqb it against zero, then pmovmskb that to a normal register, and extract the first index with bsf (and then the second and so forth, hopefully not too many)Johen
You'll need to be more specific than just SIMD - what architecture are you targetting ? x86, ARM, PowerPC, POWER and some GPGPUs all have different SIMD extensions. Also x86 has multiple SIMD extensions: MMX, SSE, SSE2, SSE3, SSSE3, SSE4, AVX, AVX2, etc. (Note that AVX2 has SIMD instructions that might be useful in this context).Armament
@Paul R Sorry about that. I edited my question - AVX2 is acceptable.Hydropic
@harold That was my plan B. I would rather use something faster, maybe a permute instruction based on compare mask, which would arrange all non-zero elements contiguously before I write them to memory.Hydropic
@harold I meant: get mask of non-zeros, AND it with [0,1,2..N] + [i,i,i..i], where i == size(b), permute to arrange them contiguously, write to memory (append to b) and update i.Hydropic
I don't know of any good way to do it that way, but it might be possible..Johen
U
8

Five methods to compute the indices of the nonzeros are:

  • Semi vectorized loop: Load a SIMD vector with chars, compare with zero and apply a movemask. Use a small scalar loop if any of the chars is nonzero (also suggested by @stgatilov). This works well for very sparse arrays. Function arr2ind_movmsk in the code below uses BMI1 instructions for the scalar loop.

  • Vectorized loop: Intel Haswell processors and newer support the BMI1 and BMI2 instruction sets. BMI2 contains the pext instruction (Parallel bits extract, see wikipedia link), which turns out to be useful here. See arr2ind_pext in the code below.

  • Classic scalar loop with if statement: arr2ind_if.

  • Scalar loop without branches: arr2ind_cmov.

  • Lookup table: @stgatilov shows that it is possible to use a lookup table instead of the pdep and other integer instructions. This might work well, however, the lookup table is quite large: it doesn't fit in the L1 cache. Not tested here. See also the discussion here.


/* 
gcc -O3 -Wall -m64 -mavx2 -fopenmp -march=broadwell -std=c99 -falign-loops=16 sprs_char2ind.c

example: Test different methods with an array a of size 20000 and approximate 25/1024*100%=2.4% nonzeros:   
              ./a.out 20000 25
*/

#include <stdio.h>
#include <immintrin.h>
#include <stdint.h>
#include <omp.h> 
#include <string.h>


__attribute__ ((noinline)) int arr2ind_movmsk(const unsigned char * restrict a, int n, int * restrict ind, int * m){
   int i, m0, k;
   __m256i msk;
   m0=0;
   for (i=0;i<n;i=i+32){                              /* Load 32 bytes and compare with zero:           */
      msk=_mm256_cmpeq_epi8(_mm256_load_si256((__m256i *)&a[i]),_mm256_setzero_si256());
      k=_mm256_movemask_epi8(msk);
      k=~k;                                           /* Search for nonzero bits instead of zero bits.  */
      while (k){
         ind[m0]=i+_tzcnt_u32(k);                     /* Count the number of trailing zero bits in k.   */
         m0++;
         k=_blsr_u32(k);                              /* Clear the lowest set bit in k.                 */
      }
   }
   *m=m0;
   return 0;
}


__attribute__ ((noinline)) int arr2ind_pext(const unsigned char * restrict a, int n, int * restrict ind, int * m){
   int i, m0;
   uint64_t     cntr_const = 0xFEDCBA9876543210;
   __m256i      shft       = _mm256_set_epi64x(0x04,0x00,0x04,0x00);
   __m256i      vmsk       = _mm256_set1_epi8(0x0F);
   __m256i      cnst16     = _mm256_set1_epi32(16);
   __m256i      shf_lo     = _mm256_set_epi8(0x80,0x80,0x80,0x0B,   0x80,0x80,0x80,0x03,   0x80,0x80,0x80,0x0A,   0x80,0x80,0x80,0x02,
                                             0x80,0x80,0x80,0x09,   0x80,0x80,0x80,0x01,   0x80,0x80,0x80,0x08,   0x80,0x80,0x80,0x00);
   __m256i      shf_hi     = _mm256_set_epi8(0x80,0x80,0x80,0x0F,   0x80,0x80,0x80,0x07,   0x80,0x80,0x80,0x0E,   0x80,0x80,0x80,0x06,
                                             0x80,0x80,0x80,0x0D,   0x80,0x80,0x80,0x05,   0x80,0x80,0x80,0x0C,   0x80,0x80,0x80,0x04);
   __m128i      pshufbcnst = _mm_set_epi8(0x80,0x80,0x80,0x80,0x80,0x80,0x80,0x80,  0x0E,0x0C,0x0A,0x08,0x06,0x04,0x02,0x00);                                            

   __m256i      i_vec      = _mm256_setzero_si256();
   m0=0;
   for (i=0;i<n;i=i+16){
      __m128i   v          = _mm_load_si128((__m128i *)&a[i]);                     /* Load 16 bytes.                                                                               */
      __m128i   msk        = _mm_cmpeq_epi8(v,_mm_setzero_si128());                /* Generate 16x8 bit mask.                                                                      */
                msk        = _mm_srli_epi64(msk,4);                                /* Pack 16x8 bit mask to 16x4 bit mask.                                                         */
                msk        = _mm_shuffle_epi8(msk,pshufbcnst);                     /* Pack 16x8 bit mask to 16x4 bit mask.                                                         */
                msk        = _mm_xor_si128(msk,_mm_set1_epi32(-1));                /* Invert 16x4 mask.                                                                            */
      uint64_t  msk64      = _mm_cvtsi128_si64x(msk);                              /* _mm_popcnt_u64 and _pext_u64 work on 64-bit general-purpose registers, not on simd registers.*/
      int       p          = _mm_popcnt_u64(msk64)>>2;                             /* p is the number of nonzeros in 16 bytes of a.                                                */
      uint64_t  cntr       = _pext_u64(cntr_const,msk64);                          /* parallel bits extract. cntr contains p 4-bit integers. The 16 4-bit integers in cntr_const are shuffled to the p 4-bit integers that we want */
                                                                                   /* The next 7 intrinsics unpack these p 4-bit integers to p 32-bit integers.                    */  
      __m256i   cntr256    = _mm256_set1_epi64x(cntr);
                cntr256    = _mm256_srlv_epi64(cntr256,shft);
                cntr256    = _mm256_and_si256(cntr256,vmsk);
      __m256i   cntr256_lo = _mm256_shuffle_epi8(cntr256,shf_lo);
      __m256i   cntr256_hi = _mm256_shuffle_epi8(cntr256,shf_hi);
                cntr256_lo = _mm256_add_epi32(i_vec,cntr256_lo);
                cntr256_hi = _mm256_add_epi32(i_vec,cntr256_hi);

                             _mm256_storeu_si256((__m256i *)&ind[m0],cntr256_lo);     /* Note that the stores of iteration i and i+16 may overlap.                                                         */
                             _mm256_storeu_si256((__m256i *)&ind[m0+8],cntr256_hi);   /* Array ind has to be large enough to avoid segfaults. At most 16 integers are written more than strictly necessary */ 
                m0         = m0+p;
                i_vec      = _mm256_add_epi32(i_vec,cnst16);
   }
   *m=m0;
   return 0;
}


__attribute__ ((noinline)) int arr2ind_if(const unsigned char * restrict a, int n, int * restrict ind, int * m){
   int i, m0;
   m0=0;
   for (i=0;i<n;i++){
      if (a[i]!=0){
         ind[m0]=i;
         m0=m0+1;
      }
   }
   *m=m0;
   return 0;
}


__attribute__((noinline)) int arr2ind_cmov(const unsigned char * restrict a, int n, int * restrict ind, int * m){
   int i, m0;
   m0=0;
   for (i=0;i<n;i++){
      ind[m0]=i;
      m0=(a[i]==0)? m0 : m0+1;   /* Compiles to cmov instruction. */
   }
   *m=m0;
   return 0;
}


__attribute__ ((noinline)) int print_nonz(const unsigned char * restrict a, const int * restrict ind, const int m){
   int i;
   for (i=0;i<m;i++) printf("i=%d,  ind[i]=%d   a[ind[i]]=%u\n",i,ind[i],a[ind[i]]);
   printf("\n");   fflush( stdout );
   return 0;
}


__attribute__ ((noinline)) int print_chk(const unsigned char * restrict a, const int * restrict ind, const int m){
   int i;                              /* Compute a hash to compare the results of different methods. */
   unsigned int chk=0;
   for (i=0;i<m;i++){
      chk=((chk<<1)|(chk>>31))^(ind[i]);
   }
   printf("chk = %10X\n",chk);
   return 0;
}



int main(int argc, char **argv){
int n, i, m; 
unsigned int j, k, d;
unsigned char *a;
int *ind;
double t0,t1;
int meth, nrep;
char txt[30];

sscanf(argv[1],"%d",&n);            /* Length of array a.                                    */
n=n>>5;                             /* Adjust n to a multiple of 32.                         */
n=n<<5;
sscanf(argv[2],"%u",&d);            /* The approximate fraction of nonzeros in a is: d/1024  */
printf("n=%d,   d=%u\n",n,d);

a=_mm_malloc(n*sizeof(char),32);
ind=_mm_malloc(n*sizeof(int),32);    

                                    /* Generate a pseudo random array a.                     */
j=73659343;                   
for (i=0;i<n;i++){
   j=j*653+1;
   k=(j & 0x3FF00)>>8;              /* k is a pseudo random number between 0 and 1023        */
   if (k<d){
      a[i] = (j&0xFE)+1;            /* Set a[i] to nonzero.                                  */
   }else{
      a[i] = 0;
   }

}

/* for (i=0;i<n;i++){if (a[i]!=0){printf("i=%d,   a[i]=%u\n",i,a[i]);}} printf("\n");   */ /* Uncomment this line to print the nonzeros in a. */ 

char txt0[]="arr2ind_movmsk: ";
char txt1[]="arr2ind_pext:   ";
char txt2[]="arr2ind_if:     ";
char txt3[]="arr2ind_cmov:   ";

nrep=10000;                                   /* Repeat a function nrep times to make relatively accurate timings possible.                          */
                                              /* With nrep=1000000:   ./a.out 10016 4 ;   ./a.out 10016 48 ;   ./a.out 10016 519                    */
                                              /* With nrep=10000:     ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 513                  */
printf("nrep = \%d  \n\n",nrep);
arr2ind_movmsk(a,n,ind,&m);                   /* Make sure that the arrays a and ind are read and/or written at least one time before benchmarking. */
for (meth=0;meth<4;meth++){
   t0=omp_get_wtime();
   switch (meth){
      case 0: for(i=0;i<nrep;i++) arr2ind_movmsk(a,n,ind,&m);         strcpy(txt,txt0); break;
      case 1: for(i=0;i<nrep;i++) arr2ind_pext(a,n,ind,&m);           strcpy(txt,txt1); break;
      case 2: for(i=0;i<nrep;i++) arr2ind_if(a,n,ind,&m);             strcpy(txt,txt2); break;
      case 3: for(i=0;i<nrep;i++) arr2ind_cmov(a,n,ind,&m);           strcpy(txt,txt3); break;
      default: ;
   }
   t1=omp_get_wtime();
   printf("method = %s  ",txt);
   /* print_chk(a,ind,m);  */
   printf(" elapsed time = %6.2f\n",t1-t0);
}
print_nonz(a, ind, 2);                                            /* Do something with the results                 */
printf("density = %f %% \n\n",((double)m)/((double)n)*100);       /* Actual nonzero density of array a.            */

/* print_nonz(a, ind, m);    */  /* Uncomment this line to print the indices of the nonzeros.                      */

return 0;
}

/*
With nrep=1000000:
 ./a.out 10016 4 ;    ./a.out 10016 4 ;    ./a.out 10016 48 ;    ./a.out 10016 48 ;    ./a.out 10016 519  ;    ./a.out 10016 519       
With nrep=10000:  
 ./a.out 1000000 5 ;  ./a.out 1000000 5 ;  ./a.out 1000000 52 ;  ./a.out 1000000 52 ;  ./a.out 1000000 513 ;   ./a.out 1000000 513     
*/


The code was tested with array size of n=10016 (the data fits in L1 cache) and n=1000000, with different nonzero densities of about 0.5%, 5% and 50%. For accurate timing the functions were called 1000000 and 10000 times, respectively.


Time in seconds, size n=10016, 1e6 function calls. Intel core i5-6500
                     0.53%        5.1%       50.0%
arr2ind_movmsk:       0.27        0.53        4.89
arr2ind_pext:         1.44        1.59        1.45
arr2ind_if:           5.93        8.95       33.82
arr2ind_cmov:         6.82        6.83        6.82

Time in seconds, size n=1000000, 1e4 function calls.

                     0.49%        5.1%       50.1%
arr2ind_movmsk:       0.57        2.03        5.37
arr2ind_pext:         1.47        1.47        1.46
arr2ind_if:           5.88        8.98       38.59
arr2ind_cmov:         6.82        6.81        6.81


In these examples the vectorized loops are faster than the scalar loops. The performance of arr2ind_movmsk depends a lot on the density of a. It is only faster than arr2ind_pext if the density is sufficiently small. The break-even point also depends on the array size n. Function 'arr2ind_if' clearly suffers from failing branch prediction at 50% nonzero density.

Unasked answered 31/1, 2017 at 13:11 Comment(0)
U
1

If you expect number of nonzero elements to be very low (i.e. much less than 1%), then you can simply check each 16-byte chunk for being nonzero:

    int mask = _mm_movemask_epi8(_mm_cmpeq_epi8(reg, _mm_setzero_si128());
    if (mask != 65535) {
        //store zero bits of mask with scalar code
    }

If percentage of good elements is sufficiently small, the cost of mispredicted branches and the cost of slow scalar code inside 'if' would be negligible.


As for a good general solution, first consider SSE implementation of stream compaction. It removes all zero elements from byte array (idea taken from here):

__m128i shuf [65536]; //must be precomputed
char    cnt  [65536]; //must be precomputed

int compress(const char *src, int len, char *dst) {
    char *ptr = dst;
    for (int i = 0; i < len; i += 16) {
        __m128i reg = _mm_load_si128((__m128i*)&src[i]);
        __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128());
        int mask = _mm_movemask_epi8(zeroMask);
        __m128i compressed = _mm_shuffle_epi8(reg, shuf[mask]);
        _mm_storeu_si128((__m128i*)ptr, compressed);
        ptr += cnt[mask];   //alternative:   ptr += 16-_mm_popcnt_u32(mask);
    }
    return ptr - dst;
}

As you see, (_mm_shuffle_epi8 + lookup table) can do wonders. I don't know any other way of vectorizing structurally complex code like stream compaction.


Now the only remaining problem with your request is that you want to get indices. Each index must be stored in 4-byte value, so a chunk of 16 input bytes may produce up to 64 bytes of output, which do not fit into single SSE register.

One way to handle this is to honestly unpack the output to 64 bytes. So you replace reg with constant (0,1,2,3,4,...,15) in the code, then unpack the SSE register into 4 registers, and add a register with four i values. This would take much more instructions: 6 unpack instructions, 4 adds, and 3 stores (one is already there). As for me, that is a huge overhead, especially if you expect less than 25% of nonzero elements.

Alternatively, you can limit the number of nonzero bytes processed by single loop iteration by 4, so that one register is always enough for output. Here is the sample code:

__m128i shufMask [65536]; //must be precomputed
char    srcMove  [65536]; //must be precomputed
char    dstMove  [65536]; //must be precomputed

int compress_ids(const char *src, int len, int *dst) {
    const char *ptrSrc = src;
    int *ptrDst = dst;
    __m128i offsets = _mm_setr_epi8(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15);
    __m128i base = _mm_setzero_si128();
    while (ptrSrc < src + len) {
        __m128i reg = _mm_loadu_si128((__m128i*)ptrSrc);
        __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128());
        int mask = _mm_movemask_epi8(zeroMask);
        __m128i ids8 = _mm_shuffle_epi8(offsets, shufMask[mask]);
        __m128i ids32 = _mm_unpacklo_epi16(_mm_unpacklo_epi8(ids8, _mm_setzero_si128()), _mm_setzero_si128());
        ids32 = _mm_add_epi32(ids32, base);
        _mm_storeu_si128((__m128i*)ptrDst, ids32);
        ptrDst += dstMove[mask];    //alternative:   ptrDst += min(16-_mm_popcnt_u32(mask), 4);
        ptrSrc += srcMove[mask];    //no alternative without LUT
        base = _mm_add_epi32(base, _mm_set1_epi32(dstMove[mask]));
    }
    return ptrDst - dst;
}

One drawback of this approach is that now each subsequent loop iteration cannot start until the line ptrDst += dstMove[mask]; is executed on the previous iteration. So the critical path has increased dramatically. Hardware hyperthreading or its manual emulation can remove this penalty.


So, as you see, there are many variations of this basic idea, all of which solve your problem with different degree of efficiency. You can also reduce size of LUT if you don't like it (again, at the cost of decreasing throughput performance).

This approach cannot be fully extended to wider registers (i.e. AVX2 and AVX-512), but you can try to combine instructions of several consecutive iterations into single AVX2 or AVX-512 instruction, thus slightly increasing throughput.

Note: I didn't test any code (because precomputing LUT correctly requires noticeable effort).

Ulent answered 11/1, 2017 at 15:22 Comment(3)
It would be nice to see how your LUT approach compares to my answer based on the Bit Manipulation Instructions (BMI1 and BMI2).Unasked
Do you, by any chance, know a way to reduce the size of this table? I mean - this will be a mb of prebuilt masks, if I'm correct. And using this approach for _m256 just isn't really feasible. I know I can divide _m256 into 2 _m128 and _m128 into 64bit pieces, but maybe you know how to do this better.Racecourse
@Denis Yaroshevskiy. It is possible to generate shuffle mask on-the-fly using BMI2 instructions, see arr2ind_pext in the answer by @UnaskedUlent
F
0

Although AVX2 instruction set has many GATHER instructions, but its performance is too slow. And the most effective way to do this - to process an array manually.

Fungistat answered 2/10, 2013 at 14:30 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.