Reindexing Elastic search via Bulk API, scan and scroll
Asked Answered
E

4

12

I am trying to re-index my Elastic search setup, currently looking at the Elastic search documentation and an example using the Python API

I'm a little bit confused as to how this all works though. I was able to obtain the scroll ID from the Python API:

es = Elasticsearch("myhost")

index = "myindex"
query = {"query":{"match_all":{}}}
response = es.search(index= index, doc_type= "my-doc-type", body= query, search_type= "scan", scroll= "10m")

scroll_id = response["_scroll_id"]

Now my question is, what use is this to me? What does knowing the scrolling id even give me? The documentation says to use the "Bulk API" but I have no idea how the scoll_id factors into this, it was a little confusing.

Could anyone give a brief example showing my how to re-index from this point, considering that I've got the scroll_id correctly?

Edgerton answered 14/10, 2014 at 22:8 Comment(0)
U
9

here is an example of reindexing to another elasticsearch node using elasticsearch-py:

from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])

helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des)

you can also reindex the result of a query to a different index here is how to do it:

from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])

body = {"query": {"term": {"year": "2004"}}}
helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des, query=body)
Unmannered answered 14/1, 2016 at 13:30 Comment(0)
S
7

Hi you can use the scroll api to go through all the documents in the most efficient way. Using the scroll_id you can find a session that is stored on the server for your specific scroll request. So you need to provide the scroll_id with each request to obtain more items.

The bulk api is for more efficient indexing documents. When copying and index you need both, but they are not really related.

I do have some java code that might help you to get a better idea about how it works.

    public void reIndex() {
    logger.info("Start creating a new index based on the old index.");

    SearchResponse searchResponse = client.prepareSearch(MUSIC_INDEX)
            .setQuery(matchAllQuery())
            .setSearchType(SearchType.SCAN)
            .setScroll(createScrollTimeoutValue())
            .setSize(SCROLL_SIZE).execute().actionGet();

    BulkProcessor bulkProcessor = BulkProcessor.builder(client,
            createLoggingBulkProcessorListener()).setBulkActions(BULK_ACTIONS_THRESHOLD)
            .setConcurrentRequests(BULK_CONCURRENT_REQUESTS)
            .setFlushInterval(createFlushIntervalTime())
            .build();

    while (true) {
        searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
                .setScroll(createScrollTimeoutValue()).execute().actionGet();

        if (searchResponse.getHits().getHits().length == 0) {
            logger.info("Closing the bulk processor");
            bulkProcessor.close();
            break; //Break condition: No hits are returned
        }

        for (SearchHit hit : searchResponse.getHits()) {
            IndexRequest request = new IndexRequest(MUSIC_INDEX_NEW, hit.type(), hit.id());
            request.source(hit.sourceRef());
            bulkProcessor.add(request);
        }
    }
}
Subheading answered 14/10, 2014 at 22:17 Comment(0)
C
5

For anyone who runs into this problem, you can use the following API from the Python client to reindex:

https://elasticsearch-py.readthedocs.org/en/master/helpers.html#elasticsearch.helpers.reindex

This would help you avoid having to scroll and search to get all the data and use the bulk API to put data into the new index.

Cutpurse answered 14/5, 2015 at 23:57 Comment(2)
It's a brief introduction, but the api is really userful to me, thanks :)Evetta
Worked great, you can use it differentially with a date "range" search to update to the same new index so very useful.Mcbee
E
0

The best way to reindex is to use Elasticsearch's builtin Reindex API as it is well supported and resilient to known issues.

The Elasticsaerch Reindex API uses scroll and bulk indexing in batches , and allows for scripted transformation of data. In Python, a similar routine could be developed:

#!/usr/local/bin/python
from elasticsearch import Elasticsearch
from elasticsearch import helpers

src = Elasticsearch(['localhost:9202'])
dst = Elasticsearch(['localhost:9200'])

body = {"query": { "match_all" : {}}}

source_index='src-index'
target_index='dst-index'
scroll_time='60s'
batch_size='500'

def transform(hits):
    for h in hits:
        h['_index'] = target_index
        yield h

rs = src.search(index=[source_index],
        scroll=scroll_time,
        size=batch_size,
        body=body
   )

helpers.bulk(dst, transform(rs['hits']['hits']), chunk_size=batch_size)

while True:
    scroll_id = rs['_scroll_id']
    rs = src.scroll(scroll_id=scroll_id, scroll=scroll_time)
    if len(rs['hits']['hits']) > 0:
        helpers.bulk(dst, transform(rs['hits']['hits']), chunk_size=batch_size)
    else:
        break;
Electrocute answered 17/6, 2019 at 20:12 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.