possible std::async implementation bug Windows
Asked Answered
S

2

2

It seems like there is a bug in the windows implementation of std::async. Under heavy load (on the order of 1000 threads launched async per second), async tasks are never scheduled and waiting on the returned futures leads to deadlocks. See this piece of code (modified with launch policy deferred instead of async):

BundlingChunk(size_t numberOfInputs, Bundler* parent, ChunkIdType chunkId)
        : m_numberOfInputs(numberOfInputs), m_parent(parent), m_chunkId(chunkId)
    {
        const BundlerChunkDescription& chunk = m_parent->m_chunks[m_chunkId];
        const ChunkInfo& original = chunk.m_original;
        auto& deserializers = m_parent->m_deserializers;

        // Fetch all chunks in parallel.
        std::vector<std::map<ChunkIdType, std::shared_future<ChunkPtr>>> chunks;
        chunks.resize(chunk.m_secondaryChunks.size());
        static std::atomic<unsigned long long int> chunksInProgress = 0;

        for (size_t i = 0; i < chunk.m_secondaryChunks.size(); ++i)
        {
            for (const auto& c : chunk.m_secondaryChunks[i])
            {
                const auto chunkCreationLambda = ([this, c, i] {
                    chunksInProgress++;
                    ChunkPtr chunk = m_parent->m_weakChunkTable[i][c].lock();
                    if (chunk) {
                        chunksInProgress--;
                        return chunk;
                    }
                    chunksInProgress--;
                    return m_parent->m_deserializers[i]->GetChunk(c);
                });
                std::future<ChunkPtr> chunkCreateFuture = std::async(std::launch::deferred, chunkCreationLambda);
                chunks[i].emplace(c, chunkCreateFuture.share());
            }
        }

        std::vector<SequenceInfo> sequences;
        sequences.reserve(original.m_numberOfSequences);

        // Creating chunk mapping.
        m_parent->m_primaryDeserializer->SequenceInfosForChunk(original.m_id, sequences);
        ChunkPtr drivingChunk = chunks.front().find(original.m_id)->second.get();
        m_sequenceToSequence.resize(deserializers.size() * sequences.size());
        m_innerChunks.resize(deserializers.size() * sequences.size());
        for (size_t sequenceIndex = 0; sequenceIndex < sequences.size(); ++sequenceIndex)
        {
            if (chunk.m_invalid.find(sequenceIndex) != chunk.m_invalid.end())
            {
                continue;
            }

            size_t currentIndex = sequenceIndex * deserializers.size();
            m_sequenceToSequence[currentIndex] = sequences[sequenceIndex].m_indexInChunk;
            m_innerChunks[currentIndex] = drivingChunk;
        }

        // Creating sequence mapping and requiring underlying chunks.
        SequenceInfo s;
        for (size_t deserializerIndex = 1; deserializerIndex < deserializers.size(); ++deserializerIndex)
        {
            auto& chunkTable = m_parent->m_weakChunkTable[deserializerIndex];
            for (size_t sequenceIndex = 0; sequenceIndex < sequences.size(); ++sequenceIndex)
            {
                if (chunk.m_invalid.find(sequenceIndex) != chunk.m_invalid.end())
                {
                    continue;
                }

                size_t currentIndex = sequenceIndex * deserializers.size() + deserializerIndex;
                bool exists = deserializers[deserializerIndex]->GetSequenceInfo(sequences[sequenceIndex], s);
                if (!exists)
                {
                    if(m_parent->m_verbosity >= (int)TraceLevel::Warning)
                        fprintf(stderr, "Warning: sequence '%s' could not be found in the deserializer responsible for stream '%ls'\n",
                            m_parent->m_corpus->IdToKey(sequences[sequenceIndex].m_key.m_sequence).c_str(),
                            deserializers[deserializerIndex]->StreamInfos().front().m_name.c_str());
                    m_sequenceToSequence[currentIndex] = SIZE_MAX;
                    continue;
                }

                m_sequenceToSequence[currentIndex] = s.m_indexInChunk;
                ChunkPtr secondaryChunk = chunkTable[s.m_chunkId].lock();
                if (!secondaryChunk)
                {
                    secondaryChunk = chunks[deserializerIndex].find(s.m_chunkId)->second.get();
                    chunkTable[s.m_chunkId] = secondaryChunk;
                }

                m_innerChunks[currentIndex] = secondaryChunk;
            }
        }
    }

My version above is modified so that the async tasks are launched as deferred instead of async, which fixes the issue. Has anyone else seen something like this as of VS2017 redistributable 14.12.25810? Reproducing this issue is as easy as training CNTK model that uses the text and image readers on a machine with a GPU and SSD so that the CPU deserialization becomes the bottleneck. After about 30 minutes of training, a deadlock usually occurs. Has anyone seen a similar issue on Linux? If so, it could be a bug in the code, although I doubt it because the debug counter chunksInProgress is always 0 after deadlock. For reference, the entire source file is located at https://github.com/Microsoft/CNTK/blob/455aef80eeff675c0f85c6e34a03cb73a4693bff/Source/Readers/ReaderLib/Bundler.cpp.

Synthesize answered 17/6, 2018 at 17:32 Comment(11)
use my library instead. std::async under MSVC uses PPL which implemented badly.. : github.com/David-Haim/concurrencppGusella
@DavidHaim Are you implying that some random header with no docs, no tests, even with no readme of any kind would be a better alternative to a library from top vendor? extraordinary claimsScapegoat
@DavidHaim No it doesn't. Not nowadays anyway, and I doubt it ever did, pity I can't downvote a comment.Hedi
@PaulSanders yes, it is. a future/promise dou is just a wrapper for concurrency::task. pity you never checked. it has always been this implementation .Gusella
@DavidHaim So sorry, seems you're right, I just stepped through about 4 million templates and you do indeed wind up inside ppltasks.h. I see, however, that this is built on top of Windows' native ThreadPool API, see here, so what makes you think it's such a lemon?Hedi
@DavidHaim [Meta comment] And that will teach me not to rush when the waif is inviting me for beer and poppadums. I really am sorry, I owe you one.Hedi
@PaulSanders it's just that this implementation allocates maybe ~6 different chunks of memory for a simple task and makes 2 extremely heavy system calls (the win32 threadpool thingy). it's just not scalable. In my implementation I only allocate 2 chunks - the shared state and the threadpool-task. also, the threadpool is implemented in user mode so no heavy system calls.Gusella
@DavidHaim Hmmm. From here I read : A thread pool is a collection of worker threads that efficiently execute asynchronous callbacks on behalf of the application. Looks to me like Mcrosoft went to some trouble to get this right, and I would actually anticipate that the thread management part of said thread pools is implemented in user mode anyway (why wouldn't it be?). But maybe you have benchmarked this. I certainly haven't.Hedi
@Synthesize I have rewritten my answer. I think you might be able to profit from it now, LMK (vote / accept will do :).Hedi
@DavidHaim I spent a bit of time studying the behaviour of std::async on Windows and concluded that this is governed by the way the underlying threadpool behaves (and indeed is designed to behave), so I wouldn't say that PPL is implemented 'badly', although how the threadpool it is built on works might not be to everyone's taste. More details here.Hedi
For the love of god, use a thread pool.Eck
H
3

New day, better answer (much better). Read on.

I spent some time investigating the behaviour of std::async on Windows and you're right. It's a different animal, see here.

So, if your code relies on std::async always starting a new thread of execution and returning immediately then you can't use it. Not on Windows, anyway. On my machine, the limit seems to be 768 background threads, which would fit in, more or less, with what you have observed.

Anyway, I wanted to learn a bit more about modern C++ so I had a crack at rolling my own replacement for std::async that can be used on Windows with the semantics deaired by the OP. I therefore humbly present the following:

AsyncTask: drop-in replacement for std::async

#include <future>
#include <thread>

template <class Func, class... Args>
    std::future <std::result_of_t <std::decay_t <Func> (std::decay_t <Args>...)>>
        AsyncTask (Func&& f, Args&&... args)
{
    using decay_func = std::decay_t <Func>;
    using return_type = std::result_of_t <decay_func (std::decay_t <Args>...)>;

    std::packaged_task <return_type (decay_func f, std::decay_t <Args>... args)>
        task ([] (decay_func f, std::decay_t <Args>... args)
    {
        return f (args...);
    });

    auto task_future = task.get_future ();
    std::thread t (std::move (task), f, std::forward <Args> (args)...);
    t.detach ();
    return task_future;
};

Test program

#include <iostream>
#include <string>

int add_two_integers (int a, int b)
{
    return a + b;
}

std::string append_to_string (const std::string& s)
{
    return s + " addendum";
}

int main ()
{
    auto /* i.e. std::future <int> */ f1 = AsyncTask (add_two_integers , 1, 2);
    auto /* i.e. int */  i = f1.get ();
    std::cout << "add_two_integers : " << i << std::endl;

    auto  /* i.e. std::future <std::string> */ f2 = AsyncTask (append_to_string , "Hello world");
    auto /* i.e. std::string */ s = f2.get ();        std::cout << "append_to_string : " << s << std::endl;
    return 0;  
}

Output

add_two_integers : 3
append_to_string : Hello world addendum

Live demo here (gcc) and here (clang).

I learnt a lot from writing this and it was a lot of fun. I'm fairly new to this stuff, so all comments welcome. I'll be happy to update this post if I've got anything wrong.

Hedi answered 17/6, 2018 at 17:41 Comment(3)
I am aware that most implementations of std::async use a thread pool and schedule the async tasks among these threads, which I think was the reason why the authors of this code chose to use it (oversubscription would definitely occur with so many tasks submitted per second). But for some reason some tasks submitted to the runtime via std::async while the system is under heavy load are never executed. The thread-pool can be bypassed by using the execution policy deferred, which works and is why I suspect that the bug is related to the std::async implementation.Synthesize
Hmmm. Have you tried to reduce your code to an minimal reproducible example? If your claim that std:;async is broken is true then it should not be hard to reproduce the issue in simpler code that (a) can easily be seen has no inherent problems of its own, and (b) can be tested by other people to see if they can reproduce the issue for themselves. That's generally how things work around here.Hedi
Beware that this is also not a standard-conforming implementation: std::async does not return a normal future, but a special one that joins the thread on destruction, i.e. blocks on destruction.Giorgio
G
0

Inspired by Paul Sander's answer, I tried to simplify his code a little bit:

#include <functional>
#include <future>
#include <thread>
#include <type_traits>

template <class Func, class... Args>
[[nodiscard]] std::future<std::invoke_result_t<std::decay_t<Func>, std::decay_t<Args>...>>
RunInThread(Func&& func, Args&&... args){
  using return_type = std::invoke_result_t<std::decay_t<Func>, std::decay_t<Args>...>;

  auto bound_func = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
  std::packaged_task<return_type(void)> task(bound_func);
  auto task_future = task.get_future();
  std::thread(std::move(task)).detach();
  return task_future;
}

Unfortunately neither this nor Paul's implementation are standard-conforming. The future returned by std::async is not a normal future, but a special one: It can't be destroyed until the task is finished, i.e. it call's join() on the task thread in its destructor.

This feature of std::future is not accessible to anyone but std::async. So in order to get the correct behavior you have implement this yourself. I put a full implementation of all of this in a gist, because it's quite lengthy.

Giorgio answered 3/7, 2020 at 8:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.