Memory leak in gRPC async_client
Asked Answered
P

1

3

I am using gRPC async client in similar way to the example.

In this example (published in the gRPC official github) the client allocate memory for the message to send, using the address as tag for the completion queue, and when the message is being answered in listener thread the memory (known by the tag- address) is free.

I'm afraid of situation where the server is not responding to a message and the memory is never being free.

  • Does the gRPC protect me from this situation?
  • Should I implement it in a different way? (using smart pointers/save the pointers in data structure/etc...)

Async client send function

void SayHello(const std::string& user) {
    // Data we are sending to the server.
    HelloRequest request;
    request.set_name(user);

    // Call object to store rpc data
    AsyncClientCall* call = new AsyncClientCall;

    // Because we are using the asynchronous API, we need to hold on to
    // the "call" instance in order to get updates on the ongoing RPC.
    call->response_reader =
        stub_->PrepareAsyncSayHello(&call->context, request, &cq_);

    // StartCall initiates the RPC call
    call->response_reader->StartCall();

    call->response_reader->Finish(&call->reply, &call->status, (void*)call);

}

Async client receive function for thread

void AsyncCompleteRpc() {
    void* got_tag;
    bool ok = false;

    // Block until the next result is available in the completion queue "cq".
    while (cq_.Next(&got_tag, &ok)) {
        // The tag in this example is the memory location of the call object
        AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);

        // Verify that the request was completed successfully. Note that "ok"
        // corresponds solely to the request for updates introduced by Finish().
        GPR_ASSERT(ok);

        if (call->status.ok())
            std::cout << "Greeter received: " << call->reply.message() << std::endl;
        else
            std::cout << "RPC failed" << std::endl;

        // Once we're complete, deallocate the call object.
        delete call;
    }
}

Main

int main(int argc, char** argv) {


    GreeterClient greeter(grpc::CreateChannel(
            "localhost:50051", grpc::InsecureChannelCredentials()));

    // Spawn reader thread that loops indefinitely
    std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);

    for (int i = 0; i < 100; i++) {
        std::string user("world " + std::to_string(i));
        greeter.SayHello(user);  // The actual RPC call!
    }

    std::cout << "Press control-c to quit" << std::endl << std::endl;
    thread_.join();  //blocks forever

    return 0;
}
Palmer answered 23/12, 2020 at 12:59 Comment(0)
F
2

Does the gRPC protect me from this situation?

Kinda. gRPC guarantees that all queued operations will end up in their matching completion queue sooner or later. So your code is ok as long as:

  • No exception is thrown at an unfortunate time.
  • You don't make a change to the code that creates a code path that doesn't include queuing the operation or deleting the call.

In other words: It's ok, but fragile.

Option A:

If you want to be truly robust, the way to go is std::shared_ptr<>. However, they can mess with multithreaded performance in unexpected ways. So wether it's worth it or not depends on where your app lands on the performance vs robustness spectrum.

Such a refactor would look like:

  1. Have AsyncClientCall inherit from std::enable_shared_from_this
  2. Change the construction of call to std::make_shared<AsyncClientCall>()
  3. In the completion queue handler, increase the ref-count:
while (cq_.Next(&got_tag, &ok)) {
    auto call = static_cast<AsyncClientCall*>(got_tag)->shared_from_this();

And get rid of the delete, obviously.

Option B:

You can also get a decent halfway measure with unique_ptr<>:

    auto call = std::make_unique<AsyncClientCall>();
    ...
    call->response_reader->Finish(&call->reply, &call->status, (void*)call.release());

and

    std::unique_ptr<AsyncClientCall> call{static_cast<AsyncClientCall*>(got_tag)};

This guards against refactors and exceptions while maintaining everything else. However, this is only usable for unary rpcs that produce a single completion event. Streaming rpcs, or rpcs that exchange metadata will need completely different handling.

Faraday answered 19/7, 2021 at 14:27 Comment(9)
Thanks for the answer, what happen in situation where the server isn't responding to some messages? would the messages just stay and wait in the queue? can the queue overflow? can we clean the queue from old messages in this situation?Palmer
@Palmer If the server doesn't respond, the RPC will fail sooner or later, either by timeout or the socket closing, or cancelled somehow. At that point, the tag will be placed in the queue with a non-ok status.Faraday
"can we clean the queue from old messages in this situation?" You can always cancel a call. But generally, adding a deadline is plenty enough.Faraday
what do you mean by cancel the call?Palmer
TryCancel(), but as I said set_deadline() is generally preferred because it's fire-and-forgetFaraday
Also, just to make sure you are not confused. The only public-facing notion of "queue" in grpc are completion queues, which does not contain all in-flight rpcs, only the ones that have finished user-requested operations.Faraday
Great, Thanks @Frank, could you please add this information to the answer?Palmer
For Option A, the arguments to function Finish() should include a get() on smart pointer: call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());Stoichiometry
call->response_reader->Finish(&call->reply, &call->status, (void*)call.release()); Are you sure that the call to call.release() will be processed after the &call->reply and &call->status. If the args are processed from right-to-left, this could probably produce Access violation error. linkVillous

© 2022 - 2024 — McMap. All rights reserved.