Producer/Consumer using Boost.Fibers
Asked Answered
E

1

0

I'm trying to create producer/consumer using Boost.Fibers. Looks like using channels from this example is the right thing to do. The example have to be changed slightly since I want to signal completion using promise/future. So I wrote some naive code to do no work, just signal the completion.

struct fiber_worker {
    fiber_worker() {
        wthread = std::thread([self{this}]() {
            for (int i = 0; i < 4; ++i) {
                boost::fibers::fiber{
                        [self]() {
                            task tsk;
                            while (boost::fibers::channel_op_status::closed != self->ch.pop(tsk)) {
                                tsk();
                            }
                        }}.detach();
            }
            task tsk;
            while (boost::fibers::channel_op_status::closed != self->ch.pop(tsk)) {
                tsk();
            }
        });
    }

    boost::fibers::future<void> submit() noexcept {
        boost::fibers::promise<void> prom;
        auto retVal = prom.get_future();
        ch.push([p{std::move(prom)}]() mutable { p.set_value(); });
        return retVal;
    }

    ~fiber_worker() {
        ch.close();
        wthread.join();
    }

    using task = std::function<void()>;
    std::thread wthread;
    boost::fibers::buffered_channel<task> ch{1024};
};

However, it would not compile, it will complain about promise deleted copy constructor being accessed. First of all I dont get where (and why) is the copy constructor is being called. Second I'm not sure this is the way the boost::fibers should be used.
The usage

int main() {
        fiber_worker bwk;
        bwk.submit().get();
}

The error message

In file included from /usr/include/c++/7/future:48:0, from /home/user/Downloads/boost_1_66_0/boost/fiber/exceptions.hpp:12, from /home/user/Downloads/boost_1_66_0/boost/fiber/future/future.hpp:17, from /home/user/Development/Tests/shared_state_test/main.cpp:4: /usr/include/c++/7/bits/std_function.h: In instantiation of ‘static void std::_Function_base::_Base_manager<_Functor>::_M_clone(std::_Any_data&, const std::_Any_data&, std::false_type) [with _Functor = fiber_worker::submit()::; std::false_type = std::integral_constant]’: /usr/include/c++/7/bits/std_function.h:227:16: required from ‘static bool std::_Function_base::_Base_manager<_Functor>::_M_manager(std::_Any_data&, const std::_Any_data&, std::_Manager_operation) [with _Functor = fiber_worker::submit()::]’ /usr/include/c++/7/bits/std_function.h:695:19: required from ‘std::function<_Res(_ArgTypes ...)>::function(_Functor) [with _Functor = fiber_worker::submit()::; = void; = void; _Res = void; _ArgTypes = {}]’ /home/user/Development/Tests/shared_state_test/main.cpp:45:66:
required from here /usr/include/c++/7/bits/std_function.h:192:6: error: use of deleted function ‘fiber_worker::submit()::::(const fiber_worker::submit()::&)’ new _Functor(__source._M_access<_Functor>()); ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /home/user/Development/Tests/shared_state_test/main.cpp:45:36: note: ‘fiber_worker::submit()::::(const fiber_worker::submit()::&)’ is implicitly deleted because the default definition would be ill-formed: ch.push(p{std::move(prom)} mutable { p.set_value(); }); ^ /home/user/Development/Tests/shared_state_test/main.cpp:45:36: error: use of deleted function ‘boost::fibers::promise::promise(const boost::fibers::promise&)’ In file included from /home/user/Development/Tests/shared_state_test/main.cpp:5:0: /home/user/Downloads/boost_1_66_0/boost/fiber/future/promise.hpp:192:5: note: declared here promise( promise const&) = delete; ^~~~~~~

EDIT001: Looks like the channel is not capable of using moving lambdas

struct test {
    test() = default;

    test(const test &rhs) = delete;

    test &operator=(const test &rhs)= delete;

    test(test &&rhs) = default;

    test &operator=(test &&rhs)= default;

    size_t _1 = 0;
    size_t _2 = 0;
    size_t _3 = 0;
    size_t _4 = 0;

    void print() const {
        std::cout << _1 << _2 << _3 << 4 << std::endl;
    }
};

int main() {
    using task = std::function<void()>;
    boost::fibers::buffered_channel<task> ch{1024};
    test tst;
    ch.push([t{std::move(tst)}]() { t.print(); });
}

Will contact boost::fibers maintainer for clarification

EDIT002: There is no problem with boost::fibers::buffered_channel the only problem here with my Alzheimer, I (again) forgot that std::function must be copyable, and when the lambda captures only movable type std::function creation will fail on copy

Elsewhere answered 15/4, 2018 at 8:3 Comment(3)
The channel clearly requires tasks to be copyable.Merissa
Yep, already figured it. I forgot (again) that std::function must be copyable and copyconstructibleElsewhere
@sehe, BTW, boost::fibers::unbuffered_channel can accept move-only entities, why the buffered one cant? especially when buffered push can accept lvalues?Elsewhere
A
0

I believe the issue isn't that the unbuffered channel can't pass movable items (one of the overloads of push takes T&&), but that it requires them to be default-constructible so that it can pre-populate the queue with elements to move the pushed item into.

Amarelle answered 8/10, 2019 at 4:55 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.