Spring Data Elasticsearch Bulk Index/Delete - Millions of Records
Asked Answered
I

2

5

I'm using Spring Data Elasticsearch 4.2.5, we have a job that does ETL (extract, transform and load data) to a particular database table. I'm indexing this data using Elasticsearch while the job is running. The data will be in millions of records and more. Currently, I'm doing index on every iteration. I read that, using elasticsearch index on every iteration might take some time. I wanted to use something like bulk-index, but for that I need to add indexQuery object to List. Adding millions of records to list and doing bulk-index may bring memory issues.

I need to apply similar kind of process for deletion. When records are deleted based on some common ID, I need to delete related elastic documents and this will also be in millions and more.

Is there anyway to do indexing/deleting very fast for this requirement? Any help is much appreciated and correct me if my understanding is incorrect.

INDEXING

for (Map.Entry<Integer, ObjectDetails> key : objectDetailsHashMap.entrySet()) {
    indexDocument(elasticsearchOperations, key, oPath);
    // other code to insert data in db table...
 }

private void indexDocument(ElasticsearchOperations elasticsearchOperations,
                              Map.Entry<Integer, ObjectDetails> key, String oPath) {
    String docId = "" + key.getValue().getCatalogId() + key.getValue().getObjectId();

    byte[] nameBytes = key.getValue().getName();
    byte[] physicalNameBytes = key.getValue().getPhysicalName();
    byte[] definitionBytes =  key.getValue().getDefinition();
    byte[] commentBytes = key.getValue().getComment();

    IndexQuery indexQuery = new IndexQueryBuilder()
            .withId(docId)
            .withObject(new MetadataSearch(
                    key.getValue().getObjectId(),
                    key.getValue().getCatalogId(),
                    key.getValue().getParentId(),
                    key.getValue().getTypeCode(),
                    key.getValue().getStartVersion(),
                    key.getValue().getEndVersion(),
                    nameBytes != null ? new String(nameBytes, StandardCharsets.UTF_8) : "-",
                    physicalNameBytes != null ? new String(physicalNameBytes, StandardCharsets.UTF_8) : "-",
                    definitionBytes != null ? new String(definitionBytes, StandardCharsets.UTF_8) : "-",
                    commentBytes != null ? new String(commentBytes, StandardCharsets.UTF_8) : "-",
                    oPath
            ))
            .build();

    elasticsearchOperations.index(indexQuery, IndexCoordinates.of("portal_idx"));
}

DELETING

private void deleteElasticDocuments(String catalogId) {
    String queryText = martServerContext.getQueryCacheInstance().getQuery(QUERY_PORTAL_GET_OBJECTS_IN_PORTAL_BY_MODEL);
    MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource();
    mapSqlParameterSource.addValue("cId", Integer.parseInt(catalogId));
    namedParameterJdbcTemplate.query(queryText, mapSqlParameterSource, (resultSet -> {
        int objectId = resultSet.getInt(O_ID);
        String docId = catalogId + objectId;
        elasticsearchOperations.delete(docId, IndexCoordinates.of("portal_idx"));
    }));
}
Inappropriate answered 28/9, 2021 at 13:42 Comment(0)
M
6

For adding the documents you could use bulk indexing for example by collecting the documents to index in a list/array or whatever and when a predefined size is reached - like 500 entries - then do a bulk insert of these.

For deleting there is no bulk operation, but you could collect ids to delete in a list or array again with am maximum size and then use ElasticsearchOperations.idsQuery(List<String>) to create a query for these ids and pass this into the delete(query) method.

Edit 29.09.2021:

the idsQuery was just added in the 4.3 branch, it is simplemented like this (https://github.com/spring-projects/spring-data-elasticsearch/blob/main/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchRestTransportTemplate.java#L193-L200):

@Override
public Query idsQuery(List<String> ids) {

    Assert.notNull(ids, "ids must not be null");

    return new NativeSearchQueryBuilder().withQuery(QueryBuilders.idsQuery().addIds(ids.toArray(new String[] {})))
            .build();
}

Mimosaceous answered 28/9, 2021 at 16:55 Comment(3)
Thanks for the hint, I will try this, but I didn't find anything like idsQuery() method, can you please provide a sample example for deletion part?Inappropriate
this is from AbstractElasticsearchRestTransportTemplate, if I do the same thing in my code like this Query query = new NativeSearchQueryBuilder() .withQuery(QueryBuilders.idsQuery().addIds(docIdList.toArray(String[]::new))) .build(); elasticsearchOperations.delete(query, IndexCoordinates.of("portal_idx")); ............. I get exception java.lang.reflect.InaccessibleObjectException: Unable to make field private final transient java.net.InetSocketAddressInappropriate
Ignore after doing delete like this along with the above code, it is working: elasticsearchOperations.delete(query, MetadataSearch.class, IndexCoordinates.of("portal_idx"));Inappropriate
P
0

Using idsQuery for bulk delete is not the best option performance-wise. Under the hood it performs just an ordinary "query" to identify the document IDs to delete, even though we know these IDs beforehand. We resorted to (snippet in Kotlin):

import org.opensearch.client.opensearch.OpenSearchClient
import org.opensearch.client.opensearch.core.BulkRequest
import org.opensearch.client.opensearch.core.bulk.BulkOperation
import org.opensearch.client.opensearch.core.bulk.DeleteOperation

...

val ids = listOf(1, 2, 3)
val index = "index"
val bulkOperation = BulkOperation.Builder().run {
    ids.forEach {
        delete(DeleteOperation.Builder().id(it).index(index).build())
    }
    build()
}
val bulkRequest = BulkRequest.Builder().operations(listOf(bulkOperation)).build()

openSearchClient.bulk(bulkRequest)
Persephone answered 3/7 at 13:50 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.