Loop vectorization - counting matches of 7-byte records with masking
Asked Answered
U

3

5

I have a fairly simple loop:

auto indexRecord = getRowPointer(0);
bool equals;
// recordCount is about 6 000 000
for (int i = 0; i < recordCount; ++i) {
    equals = BitString::equals(SelectMask, indexRecord, maxBytesValue);
    rowsFound += equals;
    indexRecord += byteSize; // byteSize is 7
}

Where BitString::equals is:

static inline bool equals(const char * mask, const char * record, uint64_t maxVal) {
    return !(((*( uint64_t * ) mask) & (maxVal & *( uint64_t * ) record)) ^ (maxVal & *( uint64_t * ) record));
}

This code is used to simulate a Bitmap Index querying in databases. My question is, if there's a way to vectorize the loop, going through all the records. When trying to compile with GCC and -fopt-info-vec-missed -O3 I am getting: missed: couldn't vectorize loop.

I am new to this kind of optimizations and would like to learn more, it just feels like I am missing something.

EDIT First of all, thank you all for answers. I should've included a Reprex. Here it is now, with all functionality needed, as close as possible I could've done. All of this is done on x86-64 platform and I have both GCC and Clang available.

#include <iostream>
#include <cstdio>
#include <cstring>
#include <cstdint>
#include <bitset>
#include <ctime>
#include <cstdlib>

constexpr short BYTE_SIZE = 8;

class BitString {
public:
    static int getByteSizeFromBits(int bitSize) {
        return (bitSize + BYTE_SIZE - 1) / BYTE_SIZE;
    }

    static void setBitString(char *rec, int bitOffset) {
        rec[bitOffset / 8] |= (1 << (bitOffset % BYTE_SIZE));
    }

    static inline bool equals(const char *mask, const char *record, uint64_t maxVal) {
        return !(((*(uint64_t *) mask) & (maxVal & *(uint64_t *) record)) ^ (maxVal & *(uint64_t *) record));
    }
};

// Class representing a table schema
class TableSchema {
public:
    // number of attributes of a table
    unsigned int attrs_count = -1;
    // the attribute size in bytes, eg. 3 equals to something like CHAR(3) in SQL
    unsigned int *attr_sizes = nullptr;
    // max value (domain) of an attribute, -1 for unlimited, ()
    int *attr_max_values = nullptr;
    // the offset of each attribute, to simplify some pointer arithmetic for further use
    unsigned int *attribute_offsets = nullptr;
    // sum of attr_sizes if the record size;
    unsigned int record_size = -1;

    void calculate_offsets() {
        if (attrs_count <= 0 || attribute_offsets != nullptr) {
            return;
        }

        attribute_offsets = new unsigned int[attrs_count];
        int offset = 0;
        for (int i = 0; i < attrs_count; ++i) {
            attribute_offsets[i] = offset;
            offset += attr_sizes[i];
        }
        record_size = offset;
    }

    TableSchema() = default;

    ~TableSchema() {
        if (attribute_offsets != nullptr) {
            delete[] attribute_offsets;
            attribute_offsets = nullptr;
        }
        attrs_count = -1;
    }
};


class BitmapIndex {
private:
    char *mData = nullptr;
    short bitSize = 0;
    int byteSize = 0;
    int attrsCount = 0;
    int *attrsMaxValue = nullptr;
    int *bitIndexAttributeOffset = nullptr;
    unsigned int recordCount = 0;
    char *SelectMask;

    unsigned int capacity = 0;

    inline char *getRowPointer(unsigned int rowId) const {
        return mData + rowId * byteSize;
    }

    inline bool shouldColBeIndexed(int max_col_value) const {
        return max_col_value > 0;
    }

public:
    BitmapIndex(const int *attrs_max_value, int attrs_count, unsigned int capacity) {
        auto maxValuesSum = 0;
        attrsMaxValue = new int[attrs_count];
        attrsCount = attrs_count;
        bitIndexAttributeOffset = new int[attrs_count];
        auto bitOffset = 0;
        // attribute's max value is the same as number of bits used to encode the current value
        // e.g., if attribute's max value is 3, we use 001 to represent value 1, 010 for 2, 100 for 3 and so on
        for (int i = 0; i < attrs_count; ++i) {
            attrsMaxValue[i] = attrs_max_value[i];
            bitIndexAttributeOffset[i] = bitOffset;
            // col is indexed only if it's max value is > 0, -1 means
            if (!shouldColBeIndexed(attrs_max_value[i]))
                continue;
            maxValuesSum += attrs_max_value[i];
            bitOffset += attrs_max_value[i];
        }
        bitSize = (short) maxValuesSum;
        byteSize = BitString::getByteSizeFromBits(bitSize);
        mData = new char[byteSize * capacity];
        memset(mData, 0, byteSize * capacity);
        SelectMask = new char[byteSize];
        this->capacity = capacity;
    }

    ~BitmapIndex() {
        if (mData != nullptr) {
            delete[] mData;
            mData = nullptr;
            delete[] attrsMaxValue;
            attrsMaxValue = nullptr;

            delete[] SelectMask;
            SelectMask = nullptr;
        }
    }

    unsigned long getTotalByteSize() const {
        return byteSize * capacity;
    }

    // add record to index
    void addRecord(const char * record, const unsigned int * attribute_sizes) {
        auto indexRecord = getRowPointer(recordCount);
        unsigned int offset = 0;
        for (int j = 0; j < attrsCount; ++j) {
            if (attrsMaxValue[j] != -1) {
                // byte col value
                char colValue = *(record + offset);
                if (colValue > attrsMaxValue[j]) {
                    throw std::runtime_error("Col value is bigger than max allowed value!");
                }
//            printf("%d ", colValue);
                BitString::setBitString(indexRecord, bitIndexAttributeOffset[j] + colValue);
            }
            offset += attribute_sizes[j];
        }
        recordCount += 1;
    }

    // SELECT COUNT(*)
    int Select(const char *query) const {
        uint64_t rowsFound = 0;
        memset(SelectMask, 0, byteSize);
        for (int col = 0; col < attrsCount; ++col) {
            if (!shouldColBeIndexed(attrsMaxValue[col])) {
                continue;
            }
            auto col_value = query[col];
            if (col_value < 0) {
                for (int i = 0; i < attrsMaxValue[col]; ++i) {
                    BitString::setBitString(SelectMask, bitIndexAttributeOffset[col] + i);
                }
            } else {
                BitString::setBitString(SelectMask, bitIndexAttributeOffset[col] + col_value);
            }
        }

        uint64_t maxBytesValue = 0;
        uint64_t byteVals = 0xff;
        for (int i = 0; i < byteSize; ++i) {
            maxBytesValue |= byteVals << (i * 8);
        }

        auto indexRecord = getRowPointer(0);
        for (int i = 0; i < recordCount; ++i) {
            rowsFound += BitString::equals(SelectMask, indexRecord, maxBytesValue);
            indexRecord += byteSize;
        }
        return rowsFound;
    }
};


void generateRecord(
        char *record,
        const unsigned int attr_sizes[],
        const int attr_max_value[],
        int attr_count
    ) {
    auto offset = 0;
    for (int c = 0; c < attr_count; ++c) {
        if (attr_max_value[c] == -1) {
            for (int j = 0; j < attr_sizes[c]; ++j) {
                record[offset + j] = rand() % 256;
            }
        } else {
            for (int j = 0; j < attr_sizes[c]; ++j) {
                record[offset + j] = rand() % attr_max_value[c];
            }
        }
        offset += attr_sizes[c];
    }
}

int main() {
    TableSchema schema;
    const int attribute_count = 13;
    const int record_count = 1000000;
    // for simplicity sake, attr_max_value > 0 is set only for attributes, which size is 1.
    unsigned int attr_sizes[attribute_count] = {1, 5, 1, 5, 1, 1, 1, 6, 1, 1, 1, 11, 1};
    int attr_max_values[attribute_count] = {3, -1, 4, -1, 6, 5, 7, -1, 7, 6, 5, -1, 8};
    schema.attrs_count = attribute_count;
    schema.attr_sizes = attr_sizes;
    schema.attr_max_values = attr_max_values;
    schema.calculate_offsets();

    srand((unsigned ) time(nullptr));

    BitmapIndex bitmapIndex(attr_max_values, attribute_count, record_count);

    char *record = new char[schema.record_size];
    for (int i = 0; i < record_count; ++i) {
        // generate some random records and add them to the index
        generateRecord(record, attr_sizes, attr_max_values, attribute_count);
        bitmapIndex.addRecord(record, attr_sizes);
    }

    char query[attribute_count] = {-1, -1, 0, -1, -1, 3, 2, -1, 3, 3, 4, -1, 6};
    // simulate Select COUNT(*) WHERE a1 = -1, a2 = -1, a3 = 0, ...
    auto found = bitmapIndex.Select(query);

    printf("Query found: %d records\n", found);

    delete[] record;
    return 0;
}
Unitarianism answered 6/4, 2022 at 20:38 Comment(5)
It looks possible. The 7-byte record size is not very nice though, requiring a shuffle and lots of unaligned loads, is there any chance it could be padded out to 8 bytes or would that cost too much?Jarib
It definetely can be padded out, but the overall index size would increase. But for now, we can assume that padding is OK and size doesn't matter.Willful
You could load two records between bits [8:119] of an SSE register. Your bit-operations boil down to (~mask & maxVal & record) != 0). In your example SelectMask does not change so ~mask & maxVal could be computed outside the loop, which means your loop would just be a pand (where the mask needs to be loaded once between bits [8:63] and [64:119]), ` pcmpeqq` with 0 and a psubq to accumulate. With AVX2, you could load two pairs using a 128bit load and a 128bit insert -- if you want a working solution, post a minimal reproducible example!! (Not sure what you can achieve with auto-vectorization.)Clap
*( uint64_t * ) mask - I hope you're compiling with -fno-strict-aliasing, otherwise this is unsafe. (Unless the only other accesses to the storage are via char*.) Actually still unsafe with misaligned uint64_t*. You could be using GNU C typedef uint64_t aliasing_unaligned_u64 __attribute__((aligned(1),may_alias)) as shown in Why does unaligned access to mmap'ed memory sometimes segfault on AMD64?. Manual vectorization is fairly straightforward, though, as chtz describes, loading so two 7-byte records are split across the middle of a 16-byte vec.Ryanryann
minor code-review: class BitString would make more sense as a namespace BitString{}, since only contains static functions. Also, constexpr short BYTE_SIZE = 8; already has a name in C, CHAR_BIT or in C++ std::numeric_limits<unsigned char>::digits. And if you don't care how narrow a constant is, make it int not short. Fortunately C++ promotes narrow types to int anyway so bitSize + BYTE_SIZE doesn't truncate anything to short.Ryanryann
J
4

If the record size was 8, both GCC and Clang would autovectorize, for example: (hopefully a sufficiently representative stand-in for your actual context in which the code occurs)

int count(char * indexRecord, const char * SelectMask, uint64_t maxVal)
{
    bool equals;
    uint64_t rowsFound = 0;
    // some arbitrary number of records
    for (int i = 0; i < 1000000; ++i) {
        equals = tequals(SelectMask, indexRecord, maxVal);
        rowsFound += equals;
        indexRecord += 8; // record size padded out to 8
    }
    return rowsFound;
}

The important part of it, as compiled by GCC, looks like this:

.L4:
    vpand   ymm0, ymm2, YMMWORD PTR [rdi]
    add     rdi, 32
    vpcmpeqq        ymm0, ymm0, ymm3
    vpsubq  ymm1, ymm1, ymm0
    cmp     rax, rdi
    jne     .L4

Not bad. It uses the same ideas that I would used manually: vpand the data with a mask (simplification of your bitwise logic), compare it to zero, subtract the results of the comparisons (subtract because a True result is indicated with -1) from 4 counters packed in a vector. The four separate counts are added after the loop.

By the way, note that I made rowsFound an uint64_t. That's important. If rowsFound is not 64-bit, then both Clang and GCC will try very hard to narrow the count ASAP, which is exactly the opposite of a good approach: that costs many more instructions in the loop, and has no benefit. If the count is intended to be a 32-bit int in the end, it can simply be narrowed after the loop, where it is probably not merely cheap but actually free to do that.

Something equivalent to that code would not be difficult to write manually with SIMD intrinsics, that could make the code less brittle (it wouldn't be based on hoping that compilers will do the right thing), but it wouldn't work for non-x86 platforms anymore.

If the records are supposed to be 7-byte, that's a more annoying problem to deal with. GCC gives up, Clang actually goes ahead with its auto-vectorization, but it's not good: the 8-byte loads are all done individually, the results then put together in a vector, which is all a big waste of time.

When doing it manually with SIMD intrinsics, the main problems would be unpacking the 7-byte records into qword lanes. An SSE4.1 version could use pshufb (pshufb is from SSSE3, but pcmpeqq is from SSE4.1 so it makes sense to target SSE4.1) to do this, easy. An AVX2 version could do a load that starts 2 bytes before the first record that it's trying to load, such that the "split" between the two 128-bit halves of the 256-bit registers falls between two records. Then vpshufb, which cannot move bytes from one 128-bit half to the other, can still move the bytes into place because none of them need to cross into the other half.

For example, an AVX2 version with manual vectorization and 7-byte records could look something like this. This requires either some padding at both the end and the start, or just skip the first record and end before hitting the last record and handle those separately. Not tested, but it would at least give you some idea of how code with manual vectorization would work.

int count(char * indexRecord, uint64_t SelectMask, uint64_t maxVal)
{
    __m256i mask = _mm256_set1_epi64x(~SelectMask & maxVal);
    __m256i count = _mm256_setzero_si256();
    __m256i zero = _mm256_setzero_si256();
    __m256i shufmask = _mm256_setr_epi8(2, 3, 4, 5, 6, 7, 8, -1, 9, 10, 11, 12, 13, 14, 15, -1, 0, 1, 2, 3, 4, 5, 6, -1, 7, 8, 9, 10, 11, 12, 13, -1);
    for (int i = 0; i < 1000000; ++i) {
        __m256i records = _mm256_loadu_si256((__m256i*)(indexRecord - 2));
        indexRecord += 7 * 4;
        records = _mm256_shuffle_epi8(records, shufmask);
        __m256i isZero = _mm256_cmpeq_epi64(_mm256_and_si256(records, mask), zero);
        count = _mm256_sub_epi64(count, isZero);
    }
    __m128i countA = _mm256_castsi256_si128(count);
    __m128i countB = _mm256_extracti128_si256(count, 1);
    countA = _mm_add_epi64(countA, countB);
    return _mm_cvtsi128_si64(countA) + _mm_extract_epi64(countA, 1);
}
Jarib answered 6/4, 2022 at 22:8 Comment(2)
The trick for AVX2 is to do unaligned loads that split the 4 records you want across the middle of the register, instead of having the first record start at the bottom of the register. Then it's just one vpshufb, even with AVX-512VBMI vpermb. Similarly for 128-bit, chtz mentioned this trick in comments under the question. Unaligned load and pand to discard the low and high byte of the XMM. (Since the code has a mask anyway, we can fold it into that.)Ryanryann
This requires special-casing the first element to avoid reading before the beginning, in case it starts at the beginning of a page. That can be movups / pslldq (byte shift) and fall / jump into the loop, or do record 0 scalar and do the first vector load at a 6-byte offset to get records 1 and 2. (Or with AVX2, at offset 5 to get records 1..4 lined up). One way might be better for minimizing cache-line-split loads.Ryanryann
E
3

Here’s another approach. This code doesn’t use unaligned load tricks (especially valuable if you align your input data by 16 bytes), but uses more instructions overall because more shuffles, and only operates on 16-byte SSE vectors.

I have no idea how it compares to the other answers, may be either faster or slower. The code requires SSSE3 and SSE 4.1 instructions sets.

// Load 7 bytes from memory into the vector
inline __m128i load7( const uint8_t* rsi )
{
    __m128i v = _mm_loadu_si32( rsi );
    v = _mm_insert_epi16( v, *(const uint16_t*)( rsi + 4 ), 2 );
    v = _mm_insert_epi8( v, rsi[ 6 ], 6 );
    return v;
}

// Prepare mask vector: broadcast the mask, and duplicate the high byte
inline __m128i loadMask( uint64_t mask )
{
    __m128i vec = _mm_cvtsi64_si128( (int64_t)mask );
    const __m128i perm = _mm_setr_epi8( 0, 1, 2, 3, 4, 5, 6, 6, 0, 1, 2, 3, 4, 5, 6, 6 );
    return _mm_shuffle_epi8( vec, perm );
}

// Prepare needle vector: load 7 bytes, duplicate 7-th byte into 8-th, duplicate 8-byte lanes
inline __m128i loadNeedle( const uint8_t* needlePointer, __m128i mask )
{
    __m128i vec = load7( needlePointer );
    const __m128i perm = _mm_setr_epi8( 0, 1, 2, 3, 4, 5, 6, 6, 0, 1, 2, 3, 4, 5, 6, 6 );
    vec = _mm_shuffle_epi8( vec, perm );
    return _mm_and_si128( vec, mask );
}

// Compare first 14 bytes with the needle, update the accumulator
inline void compare14( __m128i& acc, __m128i vec, __m128i needle, __m128i mask )
{
    // Shuffle the vector matching the needle and mask; this duplicates two last bytes of each 7-byte record
    const __m128i perm = _mm_setr_epi8( 0, 1, 2, 3, 4, 5, 6, 6, 7, 8, 9, 10, 11, 12, 13, 13 );
    vec = _mm_shuffle_epi8( vec, perm );
    // bitwise AND with the mask
    vec = _mm_and_si128( vec, mask );
    // Compare 8-byte lanes for equality with the needle
    vec = _mm_cmpeq_epi64( vec, needle );
    // Increment the accumulator if comparison was true
    acc = _mm_sub_epi64( acc, vec );
}

size_t countRecords( const uint8_t* rsi, size_t count, const uint8_t* needlePointer, uint64_t maskValue )
{
    const __m128i mask = loadMask( maskValue );
    const __m128i needle = loadNeedle( needlePointer, mask );
    __m128i acc = _mm_setzero_si128();

    // An iteration of this loop consumes 16 records = 112 bytes = 7 SSE vectors
    const size_t countBlocks = count / 16;
    for( size_t i = 0; i < countBlocks; i++ )
    {
        const __m128i* p = ( const __m128i* )rsi;
        rsi += 7 * 16;

        __m128i a = _mm_loadu_si128( p );
        compare14( acc, a, needle, mask );

        __m128i b = _mm_loadu_si128( p + 1 );
        compare14( acc, _mm_alignr_epi8( b, a, 14 ), needle, mask );

        a = _mm_loadu_si128( p + 2 );
        compare14( acc, _mm_alignr_epi8( a, b, 12 ), needle, mask );

        b = _mm_loadu_si128( p + 3 );
        compare14( acc, _mm_alignr_epi8( b, a, 10 ), needle, mask );

        a = _mm_loadu_si128( p + 4 );
        compare14( acc, _mm_alignr_epi8( a, b, 8 ), needle, mask );

        b = _mm_loadu_si128( p + 5 );
        compare14( acc, _mm_alignr_epi8( b, a, 6 ), needle, mask );

        a = _mm_loadu_si128( p + 6 );
        compare14( acc, _mm_alignr_epi8( a, b, 4 ), needle, mask );
        compare14( acc, _mm_srli_si128( a, 2 ), needle, mask );
    }

    // Sum high / low lanes of the accumulator
    acc = _mm_add_epi64( acc, _mm_srli_si128( acc, 8 ) );

    // Handle the remainder, 7 bytes per iteration
    // Compared to your 6M records, the remainder is small, the performance doesn't matter much.
    for( size_t i = 0; i < count % 16; i++ )
    {
        __m128i a = load7( rsi );
        rsi += 7;
        compare14( acc, a, needle, mask );
    }

    return (size_t)_mm_cvtsi128_si64( acc );
}

P.S. Also, I would expect 8-byte indices to be faster despite the 15% RAM bandwidth overhead. Especially when vectorizing into AVX2.

Evetta answered 7/4, 2022 at 21:36 Comment(2)
Over large vectors, I'd expect to bottleneck on DRAM or L3 bandwidth, with that hiding unaligned-load costs. For smaller hot data yeah maybe a speedup from avoiding a shuffle if you have it already unpacked to 8-byte chunks in memory.Ryanryann
On Intel CPUs at least before Ice Lake, this will bottleneck on 2 shuffles per load. (2 shuffles per 16 bytes vs. 1 shuffle per 14 bytes for Harold's way). Could be worth trying, though. Not all of the loads will be cache-line splits for the unaligned strategy, and with cache capable of 2 loads per clock we have load throughput to spare without costing more front-end uops. And this palignr strategy only works for 128-bit vectors, or with AVX-512 for better 2-vector shuffles. (But AVX-512 valignd/q doesn't come in byte or word element-size.)Ryanryann
Q
1

First, your code is not a complete example. You're missing definitions and types of many variables, which makes it difficult to answer. You also did not indicate which platform you're compiling on/for.

Here are reasons why vectorization might fail:

  • Your reads are overlapping! you're reading 8 bytes at 7-byte intervals. That alone might confuse the vectorization logic.
  • Your pointers may not be __restrict'ed, meaning that the compiler must assume they might alias, meaning that it might need to reread from the address on every access.
  • Your equals() function pointer parameters are definitely not __restrict'ed (although the compiler could be seeing through that with inlining).
  • Alignment. x86_64 processors do not require aligned accesses, but on some platforms, some larger instructions need to know they work on properly aligned places in memory. Moreover, as @PeterCordes points out in a comment, compilers and libraries may be more picky than the hardware regarding alignment.
  • Why don't you put *SelectMask in a local variable?
Quasi answered 6/4, 2022 at 21:16 Comment(2)
GCC does not define the behaviour of deref of a misaligned uint64_t*. It often happens to work on x86-64, but can in practice fail: see my answer on Why does unaligned access to mmap'ed memory sometimes segfault on AMD64? and the couple links at the top of it to other blog articles. Equally importantly, this also needs memcpy or a typedef with __attribute__((may_alias)) as well as aligned(1) depending on how else the memory is accessed; see my comment on the question.Ryanryann
This loop seems to be read-only, so I don't think aliasing is problematic. But yeah, passing a pointer to SelectMask every time is weird.Ryanryann

© 2022 - 2024 — McMap. All rights reserved.