I'm using Spring Data Elasticsearch 4.2.5, we have a job that does ETL (extract, transform and load data) to a particular database table. I'm indexing this data using Elasticsearch while the job is running. The data will be in millions of records and more. Currently, I'm doing index on every iteration. I read that, using elasticsearch index on every iteration might take some time. I wanted to use something like bulk-index, but for that I need to add indexQuery object to List. Adding millions of records to list and doing bulk-index may bring memory issues.
I need to apply similar kind of process for deletion. When records are deleted based on some common ID, I need to delete related elastic documents and this will also be in millions and more.
Is there anyway to do indexing/deleting very fast for this requirement? Any help is much appreciated and correct me if my understanding is incorrect.
INDEXING
for (Map.Entry<Integer, ObjectDetails> key : objectDetailsHashMap.entrySet()) {
indexDocument(elasticsearchOperations, key, oPath);
// other code to insert data in db table...
}
private void indexDocument(ElasticsearchOperations elasticsearchOperations,
Map.Entry<Integer, ObjectDetails> key, String oPath) {
String docId = "" + key.getValue().getCatalogId() + key.getValue().getObjectId();
byte[] nameBytes = key.getValue().getName();
byte[] physicalNameBytes = key.getValue().getPhysicalName();
byte[] definitionBytes = key.getValue().getDefinition();
byte[] commentBytes = key.getValue().getComment();
IndexQuery indexQuery = new IndexQueryBuilder()
.withId(docId)
.withObject(new MetadataSearch(
key.getValue().getObjectId(),
key.getValue().getCatalogId(),
key.getValue().getParentId(),
key.getValue().getTypeCode(),
key.getValue().getStartVersion(),
key.getValue().getEndVersion(),
nameBytes != null ? new String(nameBytes, StandardCharsets.UTF_8) : "-",
physicalNameBytes != null ? new String(physicalNameBytes, StandardCharsets.UTF_8) : "-",
definitionBytes != null ? new String(definitionBytes, StandardCharsets.UTF_8) : "-",
commentBytes != null ? new String(commentBytes, StandardCharsets.UTF_8) : "-",
oPath
))
.build();
elasticsearchOperations.index(indexQuery, IndexCoordinates.of("portal_idx"));
}
DELETING
private void deleteElasticDocuments(String catalogId) {
String queryText = martServerContext.getQueryCacheInstance().getQuery(QUERY_PORTAL_GET_OBJECTS_IN_PORTAL_BY_MODEL);
MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource();
mapSqlParameterSource.addValue("cId", Integer.parseInt(catalogId));
namedParameterJdbcTemplate.query(queryText, mapSqlParameterSource, (resultSet -> {
int objectId = resultSet.getInt(O_ID);
String docId = catalogId + objectId;
elasticsearchOperations.delete(docId, IndexCoordinates.of("portal_idx"));
}));
}