kafka s3 sink connector crashed when It gets NULL data
Asked Answered
C

2

7

I had a working s3 sink connector until the source connector sent a NULL value; s3 connector crashed. The problem occured when I deleted a record from MS SQL db. The source connector shipped the deletion information to s3 connector and s3 connector crashed. I deleted and recreated s3 connector with a different name, nothing changed.

    org.apache.kafka.connect.errors.ConnectException: Null valued records are not writeable with current behavior.on.null.values 'settings.
        at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:91)
        at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
        at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
        at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
        at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-05-24 10:10:50,577 WARN WorkerSinkTask{id=minio-connector1-0} Ignoring invalid task provided offset filesql1.dbo.Files-0/OffsetAndMetadata{offset=16, leaderEpoch=null, metadata=''} -- not yet consumed, taskOffset=16 currentOffset=0 (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-minio-connector1-0]
2020-05-24 10:10:50,577 ERROR WorkerSinkTask{id=minio-connector1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-minio-connector1-0]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Null valued records are not writeable with current behavior.on.null.values 'settings.
        at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:91)
        at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
        at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
        at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
        at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
        ... 10 more
2020-05-24 10:10:50,577 ERROR WorkerSinkTask{id=minio-connector1-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-minio-connector1-0]

...and this is my s3 connector config:

    apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: "minio-connector1"
  labels:
    strimzi.io/cluster: mssql-minio-connect-cluster
spec:
  class: io.confluent.connect.s3.S3SinkConnector
  config:
    storage.class: io.confluent.connect.s3.storage.S3Storage  
    partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner
    tasks.max: '1'
    topics: filesql1.dbo.Files
    s3.bucket.name: dosyalar
    s3.part.size: '5242880'
    flush.size: '2'
    format: binary
    schema.compatibility: NONE
    max.request.size: "536870912"
    store.url: http://minio.dev-kik.io
    format.class: io.confluent.connect.s3.format.avro.AvroFormat
    key.converter: io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url: http://schema-registry-cp-schema-registry:8081
    value.converter: io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url: http://schema-registry-cp-schema-registry:8081
    internal.key.converter: org.apache.kafka.connect.json.JsonConverter
    internal.value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter.schemas.enable: true
    schema.generator.class: io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator

I have 2 questions:

1) How can I make s3 connector runing again?

2) It cannot be expected not to delete records from source database. How can I prevent s3 connector's crash again?

Catcall answered 24/5, 2020 at 10:34 Comment(0)
A
9

please take a look at connector documentation and look for behavior.on.null.values. You can set it to ignore.

Aegina answered 25/5, 2020 at 3:49 Comment(2)
Unfortunate the S3 sink connector does not support null values. Completely limits what it can transmit when backing up compacted topics. In fact, Confluent recommends using the S3 connector for Schema Registry's _schemas topic which is compacted. This means delete values for compacted topics do not get saved to S3, thus losing the ability to support hard deletes for Schema Registry (when you restore the hard delete is lost since the S3 connector ignores nulls).Malamute
This key also worked for io.confluent.connect.gcs.GcsSinkConnector class.Myocardiograph
D
0

Look at Page:

https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html#storage

enter image description here

You should set proper connector value to ignore it:

behavior.on.null.values=ignore
Dudeen answered 1/12, 2022 at 22:41 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.