Properly Configuring Kafka Connect S3 Sink TimeBasedPartitioner
Asked Answered
A

2

9

I am trying to use the TimeBasedPartitioner of the Confluent S3 sink. Here is my config:

{  
"name":"s3-sink",
"config":{  
    "connector.class":"io.confluent.connect.s3.S3SinkConnector",
    "tasks.max":"1",
    "file":"test.sink.txt",
    "topics":"xxxxx",
    "s3.region":"yyyyyy",
    "s3.bucket.name":"zzzzzzz",
    "s3.part.size":"5242880",
    "flush.size":"1000",
    "storage.class":"io.confluent.connect.s3.storage.S3Storage",
    "format.class":"io.confluent.connect.s3.format.avro.AvroFormat",
    "schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "timestamp.extractor":"Record",
    "timestamp.field":"local_timestamp",
    "path.format":"YYYY-MM-dd-HH",
    "partition.duration.ms":"3600000",
    "schema.compatibility":"NONE"
}

}

The data is binary and I use an avro scheme for it. I would want to use the actual record field "local_timestamp" which is a UNIX timestamp to partition the data, say into hourly files.

I start the connector with the usual REST API call

curl -X POST -H "Content-Type: application/json" --data @s3-config.json http://localhost:8083/connectors

Unfortunately the data is not partitioned as I wish. I also tried to remove the flush size because this might interfere. But then I got the error

{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nMissing required configuration \"flush.size\" which has no default value.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}%

Any idea how to properly set the TimeBasedPartioner? I could not find a working example.

Also how can one debug such a problem or gain further insight what the connector is actually doing?

Greatly appreciate any help or further suggestions.

Atlantis answered 6/1, 2018 at 15:4 Comment(0)
M
7

Indeed your amended configuration seems correct.

Specifically, setting timestamp.extractor to RecordField allows you to partition your files based on the timestamp field that your records have and which you identify by setting the property timestamp.field.

When instead one sets timestamp.extractor=Record, then a time-based partitioner will use the Kafka timestamp for each record.

Regarding flush.size, setting this property to a high value (e.g. Integer.MAX_VALUE) will be practically synonymous to ignore it.

Finally, schema.generator.class is no longer required in the most recent versions of the connector.

Mainly answered 8/1, 2018 at 6:21 Comment(0)
A
9

After studying the code at TimeBasedPartitioner.java and the logs with

confluent log connect tail -f

I realized that both timezone and locale are mandatory, although this is not specified as such in the Confluent S3 Connector documentation. The following config fields solve the problem and let me upload the records properly partitioned to S3 buckets:

"flush.size": "10000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "US",
"timezone": "UTC",
"partition.duration.ms": "3600000",
"timestamp.extractor": "RecordField",
"timestamp.field": "local_timestamp",

Note two more things: First a value for flush.size is also necessary, files are partitioned eventually into smaller chunks, not larger than specified by flush.size. Second, the path.format is better selected as displayed above so a proper tree structure is generated.

I am still not 100% sure if really the record field local_timestamp is used to partition the records.

Any comments or improvements are greatly welcome.

Atlantis answered 7/1, 2018 at 17:25 Comment(5)
did the above work? I get PartitionException: Error encoding partition which probably means (per the code that you linked above) my messages (Avro) were not deserialized correctly. Did you use schema registry?Quagmire
Yes I used schema registry and the above code worked. I guess without schema registry it cannot workAtlantis
Thanks Daniel. I had to also specify "value.converter": "io.confluent.connect.avro.AvroConverter" and "value.converter.schema.registry.url": <my_schema_registry-url> to make it work. By default, the converter was set to ByteArrayConverter. So, had to override it.Quagmire
Oh, yes that is needed, otherwise now way to understand the data structure.Atlantis
could you share a sample input record, I used the same configs and my timestamp field is epoch in seconds and it does not work fine.Spark
M
7

Indeed your amended configuration seems correct.

Specifically, setting timestamp.extractor to RecordField allows you to partition your files based on the timestamp field that your records have and which you identify by setting the property timestamp.field.

When instead one sets timestamp.extractor=Record, then a time-based partitioner will use the Kafka timestamp for each record.

Regarding flush.size, setting this property to a high value (e.g. Integer.MAX_VALUE) will be practically synonymous to ignore it.

Finally, schema.generator.class is no longer required in the most recent versions of the connector.

Mainly answered 8/1, 2018 at 6:21 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.