am getting below exception while connecting Event Hub via kafka libraries.
Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'sasl_auth_bytes': Bytes size -1 cannot be negative
ERROR [pool-6-thread-1] STREAM - code="SAE-SP-A-1000: Stream processing failed, exiting ...",exception="Invalid SASL mechanism response, server may be expecting a different protocol"
org.apache.kafka.common.errors.IllegalSaslStateException: Invalid SASL mechanism response, server may be expecting a different protocol
Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'sasl_auth_bytes': Bytes size -1 cannot be negative
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:77) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:298) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:687) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:678) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:501) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveToken(SaslClientAuthenticator.java:435) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:259) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.common.network.Selector.poll(Selector.java:483) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) ~[kafka-clients-2.2.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) ~[kafka-clients-2.2.1.jar!/:?]
The consumer properties are as given below:
bootstrap.servers=XXX-topics.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://XXXX-topics.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=**************************";