I've extracted a template for GRPC streaming which sort of abstracts away a lot of the GRPC boilerplate that also addresses the the logic for onError
. In the DechunkingStreamObserver
I use the following general pattern for GRPC streaming which is something along the lines of
META DATA DATA DATA META DATA DATA DATA
An example of where I would use it would be to take one form and transform it to another form.
message SavedFormMeta {
string id = 1;
}
message SavedFormChunk {
oneof type {
SavedFormMeta meta = 1;
bytes data = 2;
}
}
rpc saveFormDataStream(stream SavedFormChunk) returns (stream SavedFormChunk) {}
I use a flag that would track the inError
state to prevent further processing and catch exceptions on the onNext
and onComplete
both of which I redirect to onError
which forwards the error to the server side.
The code below pulls the GRPC semantics and takes lamdas that do the processing.
/**
* Dechunks a GRPC stream from the request and calls the consumer when a complete object is created. This stops
* further processing once an error has occurred.
*
* @param <T> entity type
* @param <R> GRPC chunk message type
* @param <S> GRPC message type for response streams
*/
class DechunkingStreamObserver<T, R, S> implements StreamObserver<R> {
/**
* This function takes the current entity state and the chunk and returns a copy of the combined result. Note the combiner may modify the existing data, but may cause unexpected behaviour.
*/
private final BiFunction<T, R, T> combiner;
/**
* A function that takes in the assembled object and the GRPC response observer.
*/
private final BiConsumer<T, StreamObserver<S>> consumer;
/**
* Predicate that returns true if it is a meta chunk indicating a start of a new object.
*/
private final Predicate<R> metaPredicate;
/**
* this function gets the meta chunk and supplies a new object.
*/
private final Function<R, T> objectSupplier;
/**
* GRPC response observer.
*/
private final StreamObserver<S> responseObserver;
/**
* Currently being processed entity.
*/
private T current = null;
/**
* In error state. Starts {@code false}, but once it is set to {@code true} it stops processing {@link #onNext(Object)}.
*/
private boolean inError = false;
/**
* @param metaPredicate predicate that returns true if it is a meta chunk indicating a start of a new object.
* @param objectSupplier this function gets the meta chunk and supplies a new object
* @param combiner this function takes the current entity state and the chunk and returns a copy of the combined result. Note the combiner may modify the existing data, but may cause unexpected behaviour.
* @param consumer a function that takes in the assembled object and the GRPC response observer.
* @param responseObserver GRPC response observer
*/
DechunkingStreamObserver(
final Predicate<R> metaPredicate,
final Function<R, T> objectSupplier,
final BiFunction<T, R, T> combiner,
final BiConsumer<T, StreamObserver<S>> consumer,
final StreamObserver<S> responseObserver) {
this.metaPredicate = metaPredicate;
this.objectSupplier = objectSupplier;
this.combiner = combiner;
this.consumer = consumer;
this.responseObserver = responseObserver;
}
@Override
public void onCompleted() {
if (inError) {
return;
}
try {
if (current != null) {
consumer.accept(current, responseObserver);
}
responseObserver.onCompleted();
} catch (final Exception e) {
onError(e);
}
}
@Override
public void onError(final Throwable throwable) {
responseObserver.onError(throwable);
inError = true;
}
@Override
public void onNext(final R chunk) {
if (inError) {
return;
}
try {
if (metaPredicate.test(chunk)) {
if (current != null) {
consumer.accept(current, responseObserver);
}
current = objectSupplier.apply(chunk);
} else {
current = combiner.apply(current, chunk);
}
} catch (final Exception e) {
onError(e);
}
}
}
I have 4 lamdas
Predicate<R> metaPredicate
which takes in a chunk and returns whether the chunk is meta or not.
Function<R, T> objectSupplier
which takes in a meta chunk and creates a new object that is used by your module.
BiFunction<T, R, T> combiner,
which takes in a data chunk and the current object and returns a new object that contains the combination.
BiConsumer<T, StreamObserver<S>> consumer
which will consume a completed object. It also passes in a stream observer in the case of sending new objects in response.
onError()
is called on the outbound observer, no more gRPC methods should be called from the outbound observer, right? but the execution of the method wrapping theoutboundObserver.onError()
is not halted by theoutboundObserver.onError()
, it should be stopped otherwise (like withthrow new RuntimeException()
)? I am asking this, because I have a method full ofoutboundObserver.onError()
after which the method containing them is not terminated. – Wittman