I have a program which performs some operations on a lot of files (> 10 000). It spawns N worker threads and each thread mmaps some file, does some work and munmaps it.
The problem I am facing right now is that whenever I use just 1 process with N worker threads, it has worse performance than spawning 2 processes each with N/2 worker threads. I can see this in iotop
because 1 process+N threads uses only around 75% of the disk bandwidth whereas 2 processes+N/2 threads use full bandwidth.
Some notes:
- This happens only if I use mmap()/munmap(). I have tried to replace it with fopen()/fread() and it worked just fine. But since the mmap()/munmap() comes with 3rd party library, I would like to use it in its original form.
- madvise() is called with
MADV_SEQUENTIAL
but it doesn't seem to change anything (or it just slows it down) if I remove it or change the advise argument. - Thread affinity doesn't seem to matter. I have tried to limit each thread to specific core. I have also tried to limit threads to core pairs (Hyper Threading). No results so far.
- Load reported by
htop
seems to be the same even in both cases.
So my questions are:
- Is there anything about mmap() I am not aware of when used in multithreaded environment?
- If so, why do 2 processes have better performance?
EDIT:
- As pointed out in the comments, it is running on server with 2xCPU. I should probably try to set thread affinities such that it is always running on the same CPU but I think I already tried that and it didn't work.
- Here is a piece of code with which I can reproduce the same issue as with my production software.
#include <condition_variable>
#include <deque>
#include <filesystem>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>
#ifndef WORKERS
#define WORKERS 16
#endif
bool stop = false;
std::mutex queue_mutex;
std::condition_variable queue_cv;
std::pair<const std::uint8_t*, std::size_t> map_file(const std::string& file_path)
{
int fd = open(file_path.data(), O_RDONLY);
if (fd != -1)
{
auto dir_ent = std::filesystem::directory_entry{file_path.data()};
if (dir_ent.is_regular_file())
{
auto size = dir_ent.file_size();
auto data = mmap(nullptr, size, PROT_READ, MAP_PRIVATE, fd, 0);
madvise(data, size, MADV_SEQUENTIAL);
close(fd);
return { reinterpret_cast<const std::uint8_t*>(data), size };
}
close(fd);
}
return { nullptr, 0 };
}
void unmap_file(const std::uint8_t* data, std::size_t size)
{
munmap((void*)data, size);
}
int main(int argc, char* argv[])
{
std::deque<std::string> queue;
std::vector<std::thread> threads;
for (std::size_t i = 0; i < WORKERS; ++i)
{
threads.emplace_back(
[&]() {
std::string path;
while (true)
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
while (!stop && queue.empty())
queue_cv.wait(lock);
if (stop && queue.empty())
return;
path = queue.front();
queue.pop_front();
}
auto [data, size] = map_file(path);
std::uint8_t b = 0;
for (auto itr = data; itr < data + size; ++itr)
b ^= *itr;
unmap_file(data, size);
std::cout << (int)b << std::endl;
}
}
);
}
for (auto& p : std::filesystem::recursive_directory_iterator{argv[1]})
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (p.is_regular_file())
{
queue.push_back(p.path().native());
queue_cv.notify_one();
}
}
stop = true;
queue_cv.notify_all();
for (auto& t : threads)
t.join();
return 0;
}
numactl --hardware
. – Cognoscenti