Can someone explain to me what's the proper usage of gRPC StreamObserver.onError?
Asked Answered
W

3

7

I am trying to handle gRPC errors properly (Java, Spring-boot app).

Basically, I need too transfer error details from gRPC server to client, but I find it hard to understand the proper usage of StreamObserver.onError();

The method doc says:

"Receives a terminating error from the stream. May only be called once and if called it must be the last method called. In particular if an exception is thrown by an implementation of onError no further calls to any method are allowed."

What does this "no further calls are allowed" mean? In the app that I maintain, they call other gRPC methods and they get java.lang.IllegalStateException: call already closed which is just fine, as per documentation.

I am wondering - should I (the developer) terminate the current java method (which usus gRPC calls) after an error is received? Like for example throwing an exception to stop execution. Or it is expected tht gRPC is going to terminate the execution.. (something like throwing an exception from gRPC)

Basically how do I properly use onError() and what should I expect and handle if I call it? I need an explanation of its usage and effects.

Wittman answered 8/10, 2020 at 7:29 Comment(0)
E
12

There are two StreamObserver instances involved. One is for the inbound direction, which is the StreamObserver instance you implement and pass to the gRPC library. This is the StreamObserver containing your logic for how to handle responses. The other is for the outbound direction, which is the StreamObserver instance that gRPC library returns to you when calling the RPC method. This is the StreamObserver that you use to send requests. Most of the time, these two StreamObservers are interacting with each other (e.g., in a fully duplexed streaming call, the response StreamObserver usually calls the request StreamObserver's onNext() method, this is how you achieve ping-pong behavior).

"no further calls are allowed" means you should not call any more onNext(), onComplete() and/or onError() on the outbound direction StreamObserver when the inbound StreamObserver's onError() method is invoked, even if your implementation for the inbound onError() throws an exception. Since the inbound StreamObserver is invoked asynchronously, it has nothing to do with your method that encloses the StreamObserver's implementation.

For example:


public class HelloWorld {
  private final HelloWorldStub stub;
  private StreamObserver<HelloRequest> requestObserver;

  ...

  private void sendRequest(String message) {
    requestObserver.onNext(HelloRequest.newBuilder.setMessage(message).build());
  }

  public void start() {
    stub.helloWorld(new StreamObserver<HelloResponse> {
      @Override
      public void onNext(HelloResponse response) {
        sendRequest("hello from client");
        // Optionally you can call onCompleted() or onError() on 
        // the requestObserver to terminate the call.
      }

      @Override
      public void onCompleted() {
        // You should not call any method on requestObserver.
      }

      @Override
      public void onError(Throwable error) {
        // You should not call any method on requestObserver.
      }
    });
  }

}

It has nothing to do with the start() method.

The doc is also mentioning that you should not do things like

try {
  requestObserver.onCompleted();
} catch(RuntimeException e) {
  requestObserver.onError();
}

It's mostly for user's own StreamObserver implementations. StreamObserver's returned by gRPC never throws.

Ericaericaceous answered 8/10, 2020 at 23:46 Comment(4)
Thanks! That is a great answer! Please, elaborate a bit more - if onError() is called on the outbound observer, no more gRPC methods should be called from the outbound observer, right? but the execution of the method wrapping the outboundObserver.onError() is not halted by the outboundObserver.onError(), it should be stopped otherwise (like with throw new RuntimeException())? I am asking this, because I have a method full of outboundObserver.onError() after which the method containing them is not terminated.Wittman
outboundObserver.onError() is notifying the server with an error, aka, sends an error to the server and close the RPC stream. The RPC call is terminated. It does not cause anything to happen on the client side and does not affect the application code.Ericaericaceous
Thanks a lot, this is what I needed to know!Wittman
where in the docs does it say you shouldn't do the oncompelted and onerror?Erythrocytometer
S
1

the only thing you want to do is mark as return after calling the responseObserver.onError(); like below. because there is nothing to do after sending the error.

if(condition){
    responseObserver.onError(StatusProto.toStatusException(status));
    //this is the required part
    return;
}else{
   responseObserver.onComplete(DATA);
}
Sailesh answered 19/1, 2023 at 16:54 Comment(0)
E
0

I've extracted a template for GRPC streaming which sort of abstracts away a lot of the GRPC boilerplate that also addresses the the logic for onError. In the DechunkingStreamObserver

I use the following general pattern for GRPC streaming which is something along the lines of

META DATA DATA DATA META DATA DATA DATA

An example of where I would use it would be to take one form and transform it to another form.

message SavedFormMeta {
  string id = 1;
}

message SavedFormChunk {
  oneof type {
    SavedFormMeta meta = 1;
    bytes data = 2;
  }
}

rpc saveFormDataStream(stream SavedFormChunk) returns (stream SavedFormChunk) {}

I use a flag that would track the inError state to prevent further processing and catch exceptions on the onNext and onComplete both of which I redirect to onError which forwards the error to the server side.

The code below pulls the GRPC semantics and takes lamdas that do the processing.


/**
 * Dechunks a GRPC stream from the request and calls the consumer when a complete object is created.  This stops
 * further processing once an error has occurred.
 *
 * @param <T> entity type
 * @param <R> GRPC chunk message type
 * @param <S> GRPC message type for response streams
 */
class DechunkingStreamObserver<T, R, S> implements StreamObserver<R> {

    /**
     * This function takes the current entity state and the chunk and returns a copy of the combined result.  Note the combiner may modify the existing data, but may cause unexpected behaviour.
     */
    private final BiFunction<T, R, T> combiner;

    /**
     * A function that takes in the assembled object and the GRPC response observer.
     */
    private final BiConsumer<T, StreamObserver<S>> consumer;

    /**
     * Predicate that returns true if it is a meta chunk indicating a start of a new object.
     */
    private final Predicate<R> metaPredicate;

    /**
     * this function gets the meta chunk and supplies a new object.
     */
    private final Function<R, T> objectSupplier;

    /**
     * GRPC response observer.
     */
    private final StreamObserver<S> responseObserver;

    /**
     * Currently being processed entity.
     */
    private T current = null;

    /**
     * In error state.  Starts {@code false}, but once it is set to {@code true} it stops processing {@link #onNext(Object)}.
     */
    private boolean inError = false;

    /**
     * @param metaPredicate    predicate that returns true if it is a meta chunk indicating a start of a new object.
     * @param objectSupplier   this function gets the meta chunk and supplies a new object
     * @param combiner         this function takes the current entity state and the chunk and returns a copy of the combined result.  Note the combiner may modify the existing data, but may cause unexpected behaviour.
     * @param consumer         a function that takes in the assembled object and the GRPC response observer.
     * @param responseObserver GRPC response observer
     */
    DechunkingStreamObserver(
            final Predicate<R> metaPredicate,
            final Function<R, T> objectSupplier,
            final BiFunction<T, R, T> combiner,
            final BiConsumer<T, StreamObserver<S>> consumer,
            final StreamObserver<S> responseObserver) {

        this.metaPredicate = metaPredicate;
        this.objectSupplier = objectSupplier;
        this.combiner = combiner;
        this.consumer = consumer;
        this.responseObserver = responseObserver;
    }

    @Override
    public void onCompleted() {

        if (inError) {
            return;
        }
        try {
            if (current != null) {
                consumer.accept(current, responseObserver);
            }
            responseObserver.onCompleted();
        } catch (final Exception e) {
            onError(e);
        }

    }

    @Override
    public void onError(final Throwable throwable) {

        responseObserver.onError(throwable);
        inError = true;

    }

    @Override
    public void onNext(final R chunk) {

        if (inError) {
            return;
        }
        try {
            if (metaPredicate.test(chunk)) {
                if (current != null) {
                    consumer.accept(current, responseObserver);
                }
                current = objectSupplier.apply(chunk);
            } else {
                current = combiner.apply(current, chunk);
            }
        } catch (final Exception e) {
            onError(e);
        }
    }
}

I have 4 lamdas

  • Predicate<R> metaPredicate which takes in a chunk and returns whether the chunk is meta or not.
  • Function<R, T> objectSupplier which takes in a meta chunk and creates a new object that is used by your module.
  • BiFunction<T, R, T> combiner, which takes in a data chunk and the current object and returns a new object that contains the combination.
  • BiConsumer<T, StreamObserver<S>> consumer which will consume a completed object. It also passes in a stream observer in the case of sending new objects in response.
Erythrocytometer answered 10/2, 2022 at 2:58 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.