I am streaming data from kafka to BigQuery using apache beam with google dataflow runner. I wanted to make use of insertId for deduplication, that I found described in google docs. But even tho inserts are happening within few seconds from each other I still see a lot of rows with the same insertId. Now I'm wondering that perhaps I am not using the API correctly to take advantage of deduplication mechanism for streaming inserts offered by BQ.
My code in beam for writing looks as follows:
payments.apply("Write Fx Payments to BQ", BigQueryIO.<FxPayment>write()
.withFormatFunction(ps -> FxTableRowConverter.convertFxPaymentToTableRow(ps))
.to(bqTradePaymentTable)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Besides all other fields I am setting insertId directly on TableRow in FxTableRowConverter.convertFxPaymentToTableRow method passed to BigQueryIO as format function:
row.set("insertId", insertId);
I also added that field as a column to BQ. Without it, it was failing on inserts (obviously). I couldn't find any other way to set insertId directly on BigQueryIO other than adding it to TableRow object.
Is this the correct way of using this? Because it does not work for me, I am seeing many duplicates even tho I shouldn't, since like I already mentioned inserts are happening within seconds. BigQuery doc states that streaming buffer is keeping insertId for at least one minute.