How to count character occurrences using SIMD
Asked Answered
A

3

8

I am given a array of lowercase characters (up to 1.5Gb) and a character c. And I want to find how many occurrences are of the character c using AVX instructions.

    unsigned long long char_count_AVX2(char * vector, int size, char c){
    unsigned long long sum =0;
    int i, j;
    const int con=3;
    __m256i ans[con];
    for(i=0; i<con; i++)
        ans[i]=_mm256_setzero_si256();

    __m256i Zer=_mm256_setzero_si256();
    __m256i C=_mm256_set1_epi8(c);
    __m256i Assos=_mm256_set1_epi8(0x01);
    __m256i FF=_mm256_set1_epi8(0xFF);
    __m256i shield=_mm256_set1_epi8(0xFF);
    __m256i temp;
    int couter=0;
    for(i=0; i<size; i+=32){
        couter++;
        shield=_mm256_xor_si256(_mm256_cmpeq_epi8(ans[0], Zer), FF);
        temp=_mm256_cmpeq_epi8(C, *((__m256i*)(vector+i)));
        temp=_mm256_xor_si256(temp, FF);
        temp=_mm256_add_epi8(temp, Assos);
        ans[0]=_mm256_add_epi8(temp, ans[0]);
        for(j=1; j<con; j++){
            temp=_mm256_cmpeq_epi8(ans[j-1], Zer);
            shield=_mm256_and_si256(shield, temp);
            temp=_mm256_xor_si256(shield, FF);
            temp=_mm256_add_epi8(temp, Assos);
            ans[j]=_mm256_add_epi8(temp, ans[j]);
        }
    }
    for(j=con-1; j>=0; j--){
        sum<<=8;
        unsigned char *ptr = (unsigned char*)&(ans[j]);
        for(i=0; i<32; i++){
            sum+=*(ptr+i);
        }
    }
    return sum;
}
Annmaria answered 5/2, 2019 at 18:47 Comment(8)
What is your character format? ASCII or some kind of Unicode?Frictional
The format is ASCIIAnnmaria
AVX1 or AVX2? What have you tried? Hint: Check _mm256_cmpeq_epi8 and _mm256_sub_epi8 for the most inner loop. After 255 iterations you need to start combining two bytes into one uint16, and so onFina
AVX2, well until now I have an array of 4 __m256i variables and I am pushing the overflow from index 0 until 3.Annmaria
Can you show some code of what you did? For adding 8bit integers to larger integers, what this question: stackoverflow.com/questions/54541127Fina
_mm256_cmpeq_epi8 will get you a -1 in each byte. If you subtract that from a counter (using _mm256_sub_epi8) you can directly count up to 255 or 128, i.e., your most inner loop should just contain these two intrinsics.Fina
@Adamos2468: I added efficient horizontal-sum code to chtz's answer, using vpsadbw against zero for byte to qword. And efficient shuffles for 256-bit down to 64-bit like Fastest way to do horizontal float vector sum on x86.Choppy
One core can't usually saturate DRAM bandwidth, so for large inputs it might be worth using multiple threads (especially if you already have a worker thread started and can just send it a function pointer and args). You tagged this parallel-processing, are you asking for OpenMP or something as well?Choppy
F
4

I'm intentionally leaving out some parts, which you need to figure out yourself (e.g. handling lengths that aren't a multiple of 4*255*32 bytes), but your most inner loop should look something like the one starting with for(int i...):

_mm256_cmpeq_epi8 will get you a -1 in each byte, which you can use as an integer. If you subtract that from a counter (using _mm256_sub_epi8) you can directly count up to 255 or 128. The inner loop contains just these two intrinsics. You have to stop and

#include <immintrin.h>
#include <stdint.h>

static inline
__m256i hsum_epu8_epu64(__m256i v) {
    return _mm256_sad_epu8(v, _mm256_setzero_si256());  // SAD against zero is a handy trick
}

static inline
uint64_t hsum_epu64_scalar(__m256i v) {
    __m128i lo = _mm256_castsi256_si128(v);
    __m128i hi = _mm256_extracti128_si256(v, 1);
    __m128i sum2x64 = _mm_add_epi64(lo, hi);   // narrow to 128

    hi = _mm_unpackhi_epi64(sum2x64, sum2x64);
    __m128i sum = _mm_add_epi64(hi, sum2x64);  // narrow to 64
    return _mm_cvtsi128_si64(sum);
}


unsigned long long char_count_AVX2(char const* vector, size_t size, char c)
{
    __m256i C=_mm256_set1_epi8(c);

    // todo: count elements and increment `vector` until it is aligned to 256bits (=32 bytes)
    __m256i const * simd_vector = (__m256i const *) vector;
     // *simd_vector is an alignment-required load, unlike _mm256_loadu_si256()

    __m256i sum64 = _mm256_setzero_si256();
    size_t unrolled_size_limit = size - 4*255*32 + 1;
    for(size_t k=0; k<unrolled_size_limit ; k+=4*255*32) // outer loop: TODO
    {
        __m256i counter[4]; // multiple counter registers to hide latencies
        for(int j=0; j<4; j++)
            counter[j]=_mm256_setzero_si256();
        // inner loop: make sure that you don't go beyond the data you can read
        for(int i=0; i<255; ++i)
        {   // or limit this inner loop to ~22 to avoid branch mispredicts
            for(int j=0; j<4; ++j)
            {
                counter[j]=_mm256_sub_epi8(counter[j],           // count -= 0 or -1
                                           _mm256_cmpeq_epi8(*simd_vector, C));
                ++simd_vector;
            }
        }

        // only need one outer accumulator: OoO exec hides the latency of adding into it
        sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(counter[0]));
        sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(counter[1]));
        sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(counter[2]));
        sum64 = _mm256_add_epi64(sum64, hsum_epu8_epu64(counter[3]));
    }

    uint64_t sum = hsum_epu64_scalar(sum64);

    // TODO add up remaining bytes with sum.
    // Including a rolled-up vector loop before going scalar
    //  because we're potentially a *long* way from the end

    // Maybe put some logic into the main loop to shorten the 255 inner iterations
    // if we're close to the end.  A little bit of scalar work there shouldn't hurt every 255 iters.

    return sum;
}

Godbolt link: https://godbolt.org/z/do5e3- (clang is slightly better than gcc at unrolling the most inner loop: gcc includes some useless vmovdqa instructions that will bottleneck the front-end if the data is hot in L1d cache, preventing us from running close to 2x 32-byte loads per clock)

Fina answered 5/2, 2019 at 21:4 Comment(5)
The widening to epu64 can and should be done with _mm256_sad_epu8(counter, _mm256_setzero_si256()), then _mm256_add_epi64 into one vector which you hsum at the very end.Choppy
I added code that does that hsum, and an outer loop size limit. Note that clang uses indexed addressing modes, so it's no closer than gcc to running at 2 loads per clock on Haswell/Skylake. :( They'll unlaminate from vpcmpeqb into separate uops in the issue stage. Writing it with the loop bounds as pointer-comparison might be a better bet, and get clang to just do pure pointer increments instead of silly indexing. e.g. const char *endp = min(buf + size, buf + 4*255*32) or something.Choppy
Thanks @PeterCordes for improving this! I guess for gcc it would be better to manually unroll the inner loop (i.e., create 4 variables instead of an array). Nice trick with vpsadbw.Fina
Yeah, that might help GCC avoid the stupid vmovdqa instructions. Worth trying if you're curious. Or file a missed-optimization bug; it's already optimized away any store/reload of the array of 4 vectors, and this is clearly something that it should be able to optimize. Anyway, gcc's -funroll-loops is only enabled manually or as part of -fprofile-use; for large codebases unrolling every loop hurts more than it helps, but profile-use will identify the hot loops and unroll them. I'd assume unrolling would let it avoid extra movdqa as well.Choppy
The vpsadbw trick is relatively well known for hsumming 8-bit data, and worth using even for signed by XORing to range-shift and then subtracting the 16 or 32 * 128 bias at the end. I think Agner Fog's optimization guide mentions it, or at least his VectorClass library uses it.Choppy
F
3

If you don't insist on using only SIMD instructions, you can make use
of the VPMOVMSKB instruction in combination with the POPCNT instruction. The former combines the highest bits of each byte into a 32-bit integer mask and the latter counts the 1 bits in this integer (=the count of char matches).

int couter=0;
for(i=0; i<size; i+=32) {
  ...
  couter += 
    _mm_popcnt_u32( 
      (unsigned int)_mm256_movemask_epi8( 
        _mm256_cmpeq_epi8( C, *((__m256i*)(vector+i) ))
      ) 
    );
  ...
}    

I haven't tested this solution, but you should get the gist.

Frictional answered 5/2, 2019 at 20:9 Comment(4)
I had the same idea in OP's other, now-deleted question. GMTA.Earwitness
Doing _mm256_movemask_epi8 and _mm_popcnt_u32 in the inner loop is far less efficient than _mm256_sub_epi8Fina
I guess so. But it is an alternative that is worth mentioning for its simplicity.Frictional
Possibly useful as part of a cleanup loop, or for an unaligned start/end where you shift out some of the bits before you popcnt, using a shift count calculated from the overlap. Otherwise the more reasonable "simple" version is putting the psadbw epu8->epu64 hsum inside the inner loop and using _mm256_add_epi64. That's only 1 extra instruction per vector vs. the efficient way, vs. 2 (vpcmpeqb + vpmovmskb + popcnt + add vs. vpcmpeqb (+vpsadbw) + vpsubb / q).Choppy
P
3

Probably the fastest: memcount_avx2 and memcount_sse2

size_t memcount_avx2(const void *s, int c, size_t n) 
{    
  __m256i cv = _mm256_set1_epi8(c), 
          zv = _mm256_setzero_si256(), 
         sum = zv, acr0,acr1,acr2,acr3;
  const char *p,*pe;    

  for(p = s; p != (char *)s+(n- (n % (252*32)));) 
  { 
    for(acr0 = acr1 = acr2 = acr3 = zv, pe = p+252*32; p != pe; p += 128) 
    {
      acr0 = _mm256_sub_epi8(acr0, _mm256_cmpeq_epi8(cv, _mm256_lddqu_si256((const __m256i *)p))); 
      acr1 = _mm256_sub_epi8(acr1, _mm256_cmpeq_epi8(cv, _mm256_lddqu_si256((const __m256i *)(p+32)))); 
      acr2 = _mm256_sub_epi8(acr2, _mm256_cmpeq_epi8(cv, _mm256_lddqu_si256((const __m256i *)(p+64)))); 
      acr3 = _mm256_sub_epi8(acr3, _mm256_cmpeq_epi8(cv, _mm256_lddqu_si256((const __m256i *)(p+96)))); 
      __builtin_prefetch(p+1024);
    }
    sum = _mm256_add_epi64(sum, _mm256_sad_epu8(acr0, zv));
    sum = _mm256_add_epi64(sum, _mm256_sad_epu8(acr1, zv));
    sum = _mm256_add_epi64(sum, _mm256_sad_epu8(acr2, zv));
    sum = _mm256_add_epi64(sum, _mm256_sad_epu8(acr3, zv));
  } 

  for(acr0 = zv; p+32 < (char *)s + n; p += 32)  
    acr0 = _mm256_sub_epi8(acr0, _mm256_cmpeq_epi8(cv, _mm256_lddqu_si256((const __m256i *)p))); 
  sum = _mm256_add_epi64(sum, _mm256_sad_epu8(acr0, zv));

  size_t count = _mm256_extract_epi64(sum, 0) 
               + _mm256_extract_epi64(sum, 1) 
               + _mm256_extract_epi64(sum, 2) 
               + _mm256_extract_epi64(sum, 3);  

  while(p != (char *)s + n) 
      count += *p++ == c;
  return count;
}

Benchmark skylake i7-6700 - 3.4GHz - gcc 8.3:

memcount_avx2 : 28 GB/s
memcount_sse: 23 GB/s
char_count_AVX2 : 23 GB/s (from post)

Pricefixing answered 13/9, 2019 at 20:26 Comment(4)
You can use _mm256_sub_epi8 to accumulate the cmpeq results instead of wasting instructions in the outer loop. Also, this code is too compact for its own good and not indented properly. (Maybe a tab vs. space problem in the SO markdown?) It took me a while to find where you were zeroing acr0..3 between inner-loop iterations; would make more sense to declare them inside the outer loop. I don't think there are any compilers that support AVX2 but not C99. I'd also do the end-pointer calculations on separate source lines.Choppy
I don't see what you mean, by wasting instructionsPricefixing
If you use acr0 = _mm256_sub_epi8(acr0, cmp(...)) then the outer loop can just use acr0 instead of _mm256_sub_epi8(zv, acr0). Use x -= -1 instead of x += -1 inside the inner loop. That sub is a wasted instruction in your version.Choppy
Thanks Peter, I've made the changes and a new benchmarkPricefixing

© 2022 - 2024 — McMap. All rights reserved.