I have a fairly simple loop:
auto indexRecord = getRowPointer(0);
bool equals;
// recordCount is about 6 000 000
for (int i = 0; i < recordCount; ++i) {
equals = BitString::equals(SelectMask, indexRecord, maxBytesValue);
rowsFound += equals;
indexRecord += byteSize; // byteSize is 7
}
Where BitString::equals
is:
static inline bool equals(const char * mask, const char * record, uint64_t maxVal) {
return !(((*( uint64_t * ) mask) & (maxVal & *( uint64_t * ) record)) ^ (maxVal & *( uint64_t * ) record));
}
This code is used to simulate a Bitmap Index querying in databases.
My question is, if there's a way to vectorize the loop, going through all the records.
When trying to compile with GCC and -fopt-info-vec-missed -O3
I am getting: missed: couldn't vectorize loop
.
I am new to this kind of optimizations and would like to learn more, it just feels like I am missing something.
EDIT
First of all, thank you all for answers. I should've included a Reprex.
Here it is now, with all functionality needed, as close as possible I could've done. All of this is done on x86-64
platform and I have both GCC and Clang available.
#include <iostream>
#include <cstdio>
#include <cstring>
#include <cstdint>
#include <bitset>
#include <ctime>
#include <cstdlib>
constexpr short BYTE_SIZE = 8;
class BitString {
public:
static int getByteSizeFromBits(int bitSize) {
return (bitSize + BYTE_SIZE - 1) / BYTE_SIZE;
}
static void setBitString(char *rec, int bitOffset) {
rec[bitOffset / 8] |= (1 << (bitOffset % BYTE_SIZE));
}
static inline bool equals(const char *mask, const char *record, uint64_t maxVal) {
return !(((*(uint64_t *) mask) & (maxVal & *(uint64_t *) record)) ^ (maxVal & *(uint64_t *) record));
}
};
// Class representing a table schema
class TableSchema {
public:
// number of attributes of a table
unsigned int attrs_count = -1;
// the attribute size in bytes, eg. 3 equals to something like CHAR(3) in SQL
unsigned int *attr_sizes = nullptr;
// max value (domain) of an attribute, -1 for unlimited, ()
int *attr_max_values = nullptr;
// the offset of each attribute, to simplify some pointer arithmetic for further use
unsigned int *attribute_offsets = nullptr;
// sum of attr_sizes if the record size;
unsigned int record_size = -1;
void calculate_offsets() {
if (attrs_count <= 0 || attribute_offsets != nullptr) {
return;
}
attribute_offsets = new unsigned int[attrs_count];
int offset = 0;
for (int i = 0; i < attrs_count; ++i) {
attribute_offsets[i] = offset;
offset += attr_sizes[i];
}
record_size = offset;
}
TableSchema() = default;
~TableSchema() {
if (attribute_offsets != nullptr) {
delete[] attribute_offsets;
attribute_offsets = nullptr;
}
attrs_count = -1;
}
};
class BitmapIndex {
private:
char *mData = nullptr;
short bitSize = 0;
int byteSize = 0;
int attrsCount = 0;
int *attrsMaxValue = nullptr;
int *bitIndexAttributeOffset = nullptr;
unsigned int recordCount = 0;
char *SelectMask;
unsigned int capacity = 0;
inline char *getRowPointer(unsigned int rowId) const {
return mData + rowId * byteSize;
}
inline bool shouldColBeIndexed(int max_col_value) const {
return max_col_value > 0;
}
public:
BitmapIndex(const int *attrs_max_value, int attrs_count, unsigned int capacity) {
auto maxValuesSum = 0;
attrsMaxValue = new int[attrs_count];
attrsCount = attrs_count;
bitIndexAttributeOffset = new int[attrs_count];
auto bitOffset = 0;
// attribute's max value is the same as number of bits used to encode the current value
// e.g., if attribute's max value is 3, we use 001 to represent value 1, 010 for 2, 100 for 3 and so on
for (int i = 0; i < attrs_count; ++i) {
attrsMaxValue[i] = attrs_max_value[i];
bitIndexAttributeOffset[i] = bitOffset;
// col is indexed only if it's max value is > 0, -1 means
if (!shouldColBeIndexed(attrs_max_value[i]))
continue;
maxValuesSum += attrs_max_value[i];
bitOffset += attrs_max_value[i];
}
bitSize = (short) maxValuesSum;
byteSize = BitString::getByteSizeFromBits(bitSize);
mData = new char[byteSize * capacity];
memset(mData, 0, byteSize * capacity);
SelectMask = new char[byteSize];
this->capacity = capacity;
}
~BitmapIndex() {
if (mData != nullptr) {
delete[] mData;
mData = nullptr;
delete[] attrsMaxValue;
attrsMaxValue = nullptr;
delete[] SelectMask;
SelectMask = nullptr;
}
}
unsigned long getTotalByteSize() const {
return byteSize * capacity;
}
// add record to index
void addRecord(const char * record, const unsigned int * attribute_sizes) {
auto indexRecord = getRowPointer(recordCount);
unsigned int offset = 0;
for (int j = 0; j < attrsCount; ++j) {
if (attrsMaxValue[j] != -1) {
// byte col value
char colValue = *(record + offset);
if (colValue > attrsMaxValue[j]) {
throw std::runtime_error("Col value is bigger than max allowed value!");
}
// printf("%d ", colValue);
BitString::setBitString(indexRecord, bitIndexAttributeOffset[j] + colValue);
}
offset += attribute_sizes[j];
}
recordCount += 1;
}
// SELECT COUNT(*)
int Select(const char *query) const {
uint64_t rowsFound = 0;
memset(SelectMask, 0, byteSize);
for (int col = 0; col < attrsCount; ++col) {
if (!shouldColBeIndexed(attrsMaxValue[col])) {
continue;
}
auto col_value = query[col];
if (col_value < 0) {
for (int i = 0; i < attrsMaxValue[col]; ++i) {
BitString::setBitString(SelectMask, bitIndexAttributeOffset[col] + i);
}
} else {
BitString::setBitString(SelectMask, bitIndexAttributeOffset[col] + col_value);
}
}
uint64_t maxBytesValue = 0;
uint64_t byteVals = 0xff;
for (int i = 0; i < byteSize; ++i) {
maxBytesValue |= byteVals << (i * 8);
}
auto indexRecord = getRowPointer(0);
for (int i = 0; i < recordCount; ++i) {
rowsFound += BitString::equals(SelectMask, indexRecord, maxBytesValue);
indexRecord += byteSize;
}
return rowsFound;
}
};
void generateRecord(
char *record,
const unsigned int attr_sizes[],
const int attr_max_value[],
int attr_count
) {
auto offset = 0;
for (int c = 0; c < attr_count; ++c) {
if (attr_max_value[c] == -1) {
for (int j = 0; j < attr_sizes[c]; ++j) {
record[offset + j] = rand() % 256;
}
} else {
for (int j = 0; j < attr_sizes[c]; ++j) {
record[offset + j] = rand() % attr_max_value[c];
}
}
offset += attr_sizes[c];
}
}
int main() {
TableSchema schema;
const int attribute_count = 13;
const int record_count = 1000000;
// for simplicity sake, attr_max_value > 0 is set only for attributes, which size is 1.
unsigned int attr_sizes[attribute_count] = {1, 5, 1, 5, 1, 1, 1, 6, 1, 1, 1, 11, 1};
int attr_max_values[attribute_count] = {3, -1, 4, -1, 6, 5, 7, -1, 7, 6, 5, -1, 8};
schema.attrs_count = attribute_count;
schema.attr_sizes = attr_sizes;
schema.attr_max_values = attr_max_values;
schema.calculate_offsets();
srand((unsigned ) time(nullptr));
BitmapIndex bitmapIndex(attr_max_values, attribute_count, record_count);
char *record = new char[schema.record_size];
for (int i = 0; i < record_count; ++i) {
// generate some random records and add them to the index
generateRecord(record, attr_sizes, attr_max_values, attribute_count);
bitmapIndex.addRecord(record, attr_sizes);
}
char query[attribute_count] = {-1, -1, 0, -1, -1, 3, 2, -1, 3, 3, 4, -1, 6};
// simulate Select COUNT(*) WHERE a1 = -1, a2 = -1, a3 = 0, ...
auto found = bitmapIndex.Select(query);
printf("Query found: %d records\n", found);
delete[] record;
return 0;
}
[8:119]
of an SSE register. Your bit-operations boil down to(~mask & maxVal & record) != 0)
. In your exampleSelectMask
does not change so~mask & maxVal
could be computed outside the loop, which means your loop would just be apand
(where the mask needs to be loaded once between bits[8:63]
and[64:119]
), ` pcmpeqq` with 0 and apsubq
to accumulate. With AVX2, you could load two pairs using a 128bit load and a 128bit insert -- if you want a working solution, post a minimal reproducible example!! (Not sure what you can achieve with auto-vectorization.) – Clap*( uint64_t * ) mask
- I hope you're compiling with-fno-strict-aliasing
, otherwise this is unsafe. (Unless the only other accesses to the storage are viachar*
.) Actually still unsafe with misaligneduint64_t*
. You could be using GNU Ctypedef uint64_t aliasing_unaligned_u64 __attribute__((aligned(1),may_alias))
as shown in Why does unaligned access to mmap'ed memory sometimes segfault on AMD64?. Manual vectorization is fairly straightforward, though, as chtz describes, loading so two 7-byte records are split across the middle of a 16-byte vec. – Ryanryannclass BitString
would make more sense as anamespace BitString{}
, since only containsstatic
functions. Also,constexpr short BYTE_SIZE = 8;
already has a name in C,CHAR_BIT
or in C++std::numeric_limits<unsigned char>::digits
. And if you don't care how narrow a constant is, make itint
notshort
. Fortunately C++ promotes narrow types toint
anyway sobitSize + BYTE_SIZE
doesn't truncate anything to short. – Ryanryann