How to read Meta data in gRPC using Java at client side
Asked Answered
W

3

11

I am using Java and Protoc 3.0 compiler and my proto file is mention below. https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang

syntax = "proto3";

package Telemetry;

// Interface exported by Agent
service OpenConfigTelemetry {
    // Request an inline subscription for data at the specified path.
    // The device should send telemetry data back on the same
    // connection as the subscription request.
    rpc telemetrySubscribe(SubscriptionRequest)                     returns (stream OpenConfigData) {}

    // Terminates and removes an exisiting telemetry subscription
    rpc cancelTelemetrySubscription(CancelSubscriptionRequest)      returns (CancelSubscriptionReply) {}

    // Get the list of current telemetry subscriptions from the
    // target. This command returns a list of existing subscriptions
    // not including those that are established via configuration.
    rpc getTelemetrySubscriptions(GetSubscriptionsRequest)          returns (GetSubscriptionsReply) {}

    // Get Telemetry Agent Operational States
    rpc getTelemetryOperationalState(GetOperationalStateRequest)    returns (GetOperationalStateReply) {}

    // Return the set of data encodings supported by the device for
    // telemetry data
    rpc getDataEncodings(DataEncodingRequest)                       returns (DataEncodingReply) {}
}

// Message sent for a telemetry subscription request
message SubscriptionRequest {
    // Data associated with a telemetry subscription
    SubscriptionInput input                                 = 1;

    // List of data models paths and filters
    // which are used in a telemetry operation.
    repeated Path path_list                                 = 2;

    // The below configuration is not defined in Openconfig RPC.
    // It is a proposed extension to configure additional
    // subscription request features.
    SubscriptionAdditionalConfig additional_config          = 3;
}

// Data associated with a telemetry subscription
message SubscriptionInput {
    // List of optional collector endpoints to send data for
    // this subscription.
    // If no collector destinations are specified, the collector
    // destination is assumed to be the requester on the rpc channel.
    repeated Collector  collector_list                      = 1;
}

// Collector endpoints to send data specified as an ip+port combination.
message Collector {
    // IP address of collector endpoint
    string address                                          = 1;

    // Transport protocol port number for the collector destination.
    uint32 port                                             = 2;
}

// Data model path
message Path {
    // Data model path of interest
    // Path specification for elements of OpenConfig data models
    string path                                             = 1;

    // Regular expression to be used in filtering state leaves
    string filter                                           = 2;

    // If this is set to true, the target device will only send
    // updates to the collector upon a change in data value
    bool suppress_unchanged                                 = 3;

    // Maximum time in ms the target device may go without sending
    // a message to the collector. If this time expires with
    // suppress-unchanged set, the target device must send an update
    // message regardless if the data values have changed.
    uint32 max_silent_interval                              = 4;

    // Time in ms between collection and transmission of the
    // specified data to the collector platform. The target device
    // will sample the corresponding data (e.g,. a counter) and
    // immediately send to the collector destination.
    //
    // If sample-frequency is set to 0, then the network device
    // must emit an update upon every datum change.
    uint32 sample_frequency                                 = 5;
}

// Configure subscription request additional features.
message SubscriptionAdditionalConfig {
    // limit the number of records sent in the stream
    int32 limit_records                                     = 1;

    // limit the time the stream remains open
    int32 limit_time_seconds                                = 2;
}

// Reply to inline subscription for data at the specified path is done in
// two-folds.
// 1. Reply data message sent out using out-of-band channel.
// 2. Telemetry data send back on the same connection as the
//    subscription request.

// 1. Reply data message sent out using out-of-band channel.
message SubscriptionReply {
    // Response message to a telemetry subscription creation or
    // get request.
    SubscriptionResponse response                           = 1;

    // List of data models paths and filters
    // which are used in a telemetry operation.
    repeated Path path_list                                 = 2;
}

// Response message to a telemetry subscription creation or get request.
message SubscriptionResponse {
    // Unique id for the subscription on the device. This is
    // generated by the device and returned in a subscription
    // request or when listing existing subscriptions
    uint32 subscription_id = 1;
}

// 2. Telemetry data send back on the same connection as the
//    subscription request.
message OpenConfigData {
    // router name:export IP address
    string system_id                                        = 1;

    // line card / RE (slot number)
    uint32 component_id                                     = 2;

    // PFE (if applicable)
    uint32 sub_component_id                                 = 3;

    // Path specification for elements of OpenConfig data models
    string path                                             = 4;

    // Sequence number, monotonically increasing for each
    // system_id, component_id, sub_component_id + path.
    uint64 sequence_number                                  = 5;

    // timestamp (milliseconds since epoch)
    uint64 timestamp                                        = 6;

    // List of key-value pairs
    repeated KeyValue kv                                    = 7;
}

// Simple Key-value, where value could be one of scalar types
message KeyValue {
    // Key
    string key                                              =  1;

    // One of possible values
    oneof value {
        double double_value                                 =  5;
        int64  int_value                                    =  6;
        uint64 uint_value                                   =  7;
        sint64 sint_value                                   =  8;
        bool   bool_value                                   =  9;
        string str_value                                    = 10;
        bytes  bytes_value                                  = 11;
    }
}

// Message sent for a telemetry subscription cancellation request
message CancelSubscriptionRequest {
    // Subscription identifier as returned by the device when
    // subscription was requested
    uint32 subscription_id                                  = 1;
}

// Reply to telemetry subscription cancellation request
message CancelSubscriptionReply {
    // Return code
    ReturnCode code                                         = 1;

    // Return code string
    string     code_str                                     = 2;
};

// Result of the operation
enum ReturnCode {
    SUCCESS                                                 = 0;
    NO_SUBSCRIPTION_ENTRY                                   = 1;
    UNKNOWN_ERROR                                           = 2;
}

// Message sent for a telemetry get request
message GetSubscriptionsRequest {
    // Subscription identifier as returned by the device when
    // subscription was requested
    // --- or ---
    // 0xFFFFFFFF for all subscription identifiers
    uint32 subscription_id                                  = 1;
}

// Reply to telemetry subscription get request
message GetSubscriptionsReply {
    // List of current telemetry subscriptions
    repeated SubscriptionReply subscription_list            = 1;
}

// Message sent for telemetry agent operational states request
message GetOperationalStateRequest {
    // Per-subscription_id level operational state can be requested.
    //
    // Subscription identifier as returned by the device when
    // subscription was requested
    // --- or ---
    // 0xFFFFFFFF for all subscription identifiers including agent-level
    // operational stats
    // --- or ---
    // If subscription_id is not present then sent only agent-level
    // operational stats
    uint32 subscription_id                                  = 1;

    // Control verbosity of the output
    VerbosityLevel verbosity                                = 2;
}

// Verbosity Level
enum VerbosityLevel {
    DETAIL                                                  = 0;
    TERSE                                                   = 1;
    BRIEF                                                   = 2;
}

// Reply to telemetry agent operational states request
message GetOperationalStateReply {
    // List of key-value pairs where
    //     key      = operational state definition
    //     value    = operational state value
    repeated KeyValue kv                                    = 1;
}

// Message sent for a data encoding request
message DataEncodingRequest {
}

// Reply to data encodings supported request
message DataEncodingReply {
    repeated EncodingType  encoding_list                    = 1;
}

// Encoding Type Supported
enum EncodingType {
    UNDEFINED                                               = 0;
    XML                                                     = 1;
    JSON_IETF                                               = 2;
    PROTO3                                                  = 3;
}

In order to do the service call (rpc TelemetrySubscribe) first i need to read header which have subscription id and then start reading messages. Now, using Java i am able to connect with the service, i did introduce the interceptor but when i print/retrieve header it is null. My code of calling interceptor is below,

 ClientInterceptor interceptor = new HeaderClientInterceptor();
      originChannel = OkHttpChannelBuilder.forAddress(host, port)
        .usePlaintext(true)
        .build();
     Channel channel =  ClientInterceptors.intercept(originChannel, interceptor);
      telemetryStub = OpenConfigTelemetryGrpc.newStub(channel);

This is interceptor code to read meta Data.

  @Override
  public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
      CallOptions callOptions, Channel next) {
    return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {

      @Override
      public void start(Listener<RespT> responseListener, Metadata headers) {

        super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
          @Override
          public void onHeaders(Metadata headers) {

             Key<String> CUSTOM_HEADER_KEY = Metadata.Key.of("responseKEY", Metadata.ASCII_STRING_MARSHALLER);

            System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_HEADER_KEY));

Wondering is there any other way to read meta data or first message which have subscription ID in it? All i need to read first message which have subscription Id, and return the same subscription id to server so that streaming can start I have equivalent Python code using same proto file and it is communicating with server by code mention below for reference only:

     sub_req = SubscribeRequestMsg("host",port)
     data_itr = stub.telemetrySubscribe(sub_req, _TIMEOUT_SECONDS)
     metadata = data_itr.initial_metadata()

                   if metadata[0][0] == "responseKey":
                    metainfo = metadata[0][1]
                    print metainfo

                    subreply = agent_pb2.SubscriptionReply()
                    subreply.SetInParent()
                    google.protobuf.text_format.Merge(metainfo, subreply)

                    if subreply.response.subscription_id:
                    SUB_ID = subreply.response.subscription_id

From the python code above i can easily retrieve meta data object, not sure how to retrieve same using Java?

After reading metaData all i am getting is: Metadata({content-type=[application/grpc], grpc-encoding=[identity], grpc-accept-encoding=[identity,deflate,gzip]})

But i know there is one more line from meta data to it, which is

response {
  subscription_id: 2
}

How can i extract last response from Header which have subscription id in it. I did try many options and i am lost here.

Wanitawanneeickel answered 18/4, 2017 at 18:1 Comment(0)
C
13

The method you used is for request metadata, not response metadata:

public void start(Listener<RespT> responseListener, Metadata headers) {

For response metadata, you will need a ClientCall.Listener and wait for the onHeaders callback:

public void onHeaders(Metadata headers)

I do feel like the usage of metadata you mention seems strange. Metadata is generally for additional error details or cross-cutting features that aren't specific to the RPC method (like auth, tracing, etc.).

Calibrate answered 18/4, 2017 at 22:54 Comment(5)
public void start(Listener<RespT> responseListener, Metadata headers) { ClientCall.Listener<RespT> listener = new ClientCall.Listener<RespT>() { @Override public void onHeaders(Metadata headers) { SubscriptionReply s = System.out.println("Header found1"+ headers); super.onHeaders(headers); } }; super.start(listener, headers); } }; It's now stuck connection is established but data is not flowing.Wanitawanneeickel
@Ammad, you aren't ever calling the passed-in Listener (responseListener). Extend SimpleForwardingClientCallListener and pass in responseListener to its constructor.Calibrate
I need some help here. Where to call the Listener? I can see that onHeaders() is invoked with some starting data.Wanitawanneeickel
SimpleForwardingClientCallListener calls the listener for you when you call super. So super.onHeaders() will call responseListener.onHeaders(). So simply extend SimpleForwardingClientCallListener and make sure to call super for each method you override.Calibrate
public void start(Listener<RespT> responseListener, Metadata headers) { responseListener = new SimpleForwardingClientCallListener<RespT>(responseListener) { Override public void onHeaders(Metadata headers) { System.out.println("Header found"+ headers); super.onHeaders(headers); } }; ClientCall.Listener<RespT> listener = new ClientCall.Listener<RespT>() { @Override public void onHeaders(Metadata headers) {Wanitawanneeickel
A
7

Often times using the ClientInterceptor is inconvenient because you need to maintain a reference to it in order to pull the data back out. In your case, the data is actually Metadata. One way you can get access to the Metadata easier is by putting it inside of the Context.

For example, you could create a Context.Key for the subscription id. In your client interceptor, you could extract the Metadata header that you want, and put it inside the Context, using Context.current().withValue(key, metadata). Inside your StreamObserver, you can extract this This by calling key.get(Context.current()). This assumes you are using the Async API, rather than the blocking API.

The reason it is more difficult is because usually metadata is information about a call, but not directly related to the call itself. It is for things like tracing, encoding, stats, cancellation and things like that. If something changes the way you handle the request, it probably needs to go directly into the request itself, rather than being on the side.

Aruwimi answered 18/4, 2017 at 22:55 Comment(6)
Hi Carl, I am just subscriber of the service. I can't put key for client to pull. All i can do is to receive header data and read the header key as first step, so that the Stream can start.Wanitawanneeickel
How can i create Context.key for subscription_id? Assuming subscription_id is string?Wanitawanneeickel
Creating a new Context per-call on the client-side dangerous, as it can form an infinitely long Context chain with the Async and Future APIs. The docs for ClientInterceptors now explicitly calls this out. The safer way on client-side is to use AbstractStub.withOption and retrieve the configuration via CallOptions.getOption.Calibrate
How on earth could I extract Metadata header in client interceptor in Java? There are only method, callOptions, and next parameters in interceptCall method of ClientInterceptor interface.Fabio
It's extracted via the ClientCall(.Listener) you create in the interceptor. The ClientCall intercepts the calls from the networkAruwimi
@CarlMastrangelo Thank you for your reply. Actually I happened to find what you said after I wrote reply. But, I'm stuck with how to put it inside to Context after that. I can't find right place to put new Context I got after calling 'withValue()' inside of the onHeaders method of a class implementing ForwardingClientCallListener.Fabio
E
2

In case this helps anyone else out:

I needed to write a specific response header in my gRPC-java server based upon the request/response.

What I ended up doing was storing the response header value in a Context using the Context::withValue (which doesn't modify the existing context but instead creates a new Context actually), and then calling the request service handler method's StreamObserver::onNext inside of the Context::run callback. StremableObserver::onNext calls the ServerCall::sendHeaders I have in my set from my ServerInterceptor. There in the sendHeaders, it can read the value in the Context I stored and set the response header value.

I think this is similar to @carl-mastrangelo's approach, just may be spelled out a little bit more.

public enum MyServerInterceptor implements ServerInterceptor {
    INSTANCE;

    public static final Metadata.Key<String> METADATA_KEY =
            Metadata.Key.of("fish", ASCII_STRING_MARSHALLER);

    public static final Context.Key<String> CONTEXT_KEY = Context.key("dog");

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
                                                                 Metadata requestHeaders,
                                                                 ServerCallHandler<ReqT, RespT> next) {

        ServerCall<ReqT, RespT> myServerCall = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
            @Override
            public void sendHeaders(Metadata responseHeaders) {
                String value = CONTEXT_KEY.get(Context.current());
                responseHeaders.put(METADATA_KEY, value);
                super.sendHeaders(responseHeaders);
            }
        };

        return next.startCall(myServerCall, requestHeaders);
    }
}

public class MyService extends MyServiceGrpc.MyServiceyImplBase {
    @Override
    public void serviceMethod(MyRequest request, StreamObserver<MyResponse> responseObserver) {
        MyResponse response = new MyResponse();

        Context.current().withValue(Context.key("dog"), "cat").run(() -> {
            responseObserver.onNext(response);
        });

        responseObserver.onCompleted();
    }
}

The again the critical piece here is that responseObserver::onNext calls into ForwardingServerCall.SimpleForwardingServerCall::sendHeaders.

Please let me know if there is a better way. This all seems more complex than I'd like.

Encipher answered 10/2, 2022 at 4:4 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.