Batch Size in kafka jdbc sink connector
Asked Answered
C

2

3

I want to read only 5000 records in a batch through jdbc sink, for which I've used the batch.size in the jdbc sink config file:

name=jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
batch.size=5000
topics=postgres_users

connection.url=jdbc:postgresql://localhost:34771/postgres?user=foo&password=bar
file=test.sink.txt
auto.create=true

But the batch.size has no effect as records are getting inserted into the database when new records are inserted into the source database.

How can I achieve to insert in a batch of 5000?

Cant answered 25/10, 2019 at 4:52 Comment(0)
C
5

There is no direct solution to sink records in batches but we give try tune below property if it works. I have never tried but my understanding Kafka Sink Connector nothing but a consumer to consume message from the topic.

max.poll.records: The maximum number of records returned in a single call to poll()

consumer.fetch.min.bytes: The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request

fetch.wait.max.ms: The broker will wait for this amount of time BEFORE sending a response to the consumer client unless it has enough data to fill the response (fetch.message.max.bytes)

fetch.min.bytes: The broker will wait for this amount of data to fill BEFORE it sends the response to the consumer client.

Christie answered 25/10, 2019 at 6:28 Comment(1)
Thanks for the answer it was helpful but I am using JDBC sink connector and I don't think we have these configs at connector level and I don't want these configurations to affect other connectors so I cant put these globallyCant
H
3

In order to set the batch size you have two options:

  • Add max.poll.records=5000 in your worker.properties file used by the Kafka Connect instance (standalone or distributed);
  • Set the same property in the connector configuration file (the JSON file for distributed connectors).

For the second option, you have to:

  • Enable the possibility to override connect properties by adding connector.client.config.override.policy=All in the worker.properties
  • Configure the batch size with "consumer.override.max.poll.records" : 2000 in the connector configuration (notice the "consumer.override." prefix)
Hugh answered 9/3, 2021 at 17:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.