elasticsearch scrolling using python client
Asked Answered
H

5

8

When scrolling in elasticsearch it is important to provide at each scroll the latest scroll_id:

The initial search request and each subsequent scroll request returns a new scroll_id — only the most recent scroll_id should be used.

The following example (taken from here) puzzle me. First, the srolling initialization:

rs = es.search(index=['tweets-2014-04-12','tweets-2014-04-13'], 
               scroll='10s', 
               search_type='scan', 
               size=100, 
               preference='_primary_first',
               body={
                 "fields" : ["created_at", "entities.urls.expanded_url", "user.id_str"],
                   "query" : {
                     "wildcard" : { "entities.urls.expanded_url" : "*.ru" }
                   }
               }
   )
sid = rs['_scroll_id']

and then the looping:

tweets = [] while (1):
    try:
        rs = es.scroll(scroll_id=sid, scroll='10s')
        tweets += rs['hits']['hits']
    except:
        break

It works, but I don't see where sid is updated... I believe that it happens internally, in the python client; but I don't understand how it works...

Halfassed answered 28/7, 2014 at 14:15 Comment(1)
when does this break out of the loop? I tried this approach, it kept going and I had to stop manually. ThanksStander
H
25

This is an old question, but for some reason came up first when searching for "elasticsearch python scroll". The python module provides a helper method to do all the work for you. It is a generator function that will return each document to you while managing the underlying scroll ids.

https://elasticsearch-py.readthedocs.io/en/master/helpers.html#scan

Here is an example of usage:

from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan

query = {
    "query": {"match_all": {}}
}

es = Elasticsearch(...)
for hit in scan(es, index="my-index", query=query):
    print(hit["_source"]["field"])
Honoria answered 14/8, 2020 at 17:27 Comment(0)
M
9

Using python requests

import requests
import json

elastic_url = 'http://localhost:9200/my_index/_search?scroll=1m'
scroll_api_url = 'http://localhost:9200/_search/scroll'
headers = {'Content-Type': 'application/json'}

payload = {
    "size": 100,
    "sort": ["_doc"]
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    }
}

r1 = requests.request(
    "POST",
    elastic_url,
    data=json.dumps(payload),
    headers=headers
)

# first batch data
try:
    res_json = r1.json()
    data = res_json['hits']['hits']
    _scroll_id = res_json['_scroll_id']
except KeyError:
    data = []
    _scroll_id = None
    print 'Error: Elastic Search: %s' % str(r1.json())
while data:
    print data
    # scroll to get next batch data
    scroll_payload = json.dumps({
        'scroll': '1m',
        'scroll_id': _scroll_id
    })
    scroll_res = requests.request(
        "POST", scroll_api_url,
        data=scroll_payload,
        headers=headers
    )
    try:
        res_json = scroll_res.json()
        data = res_json['hits']['hits']
        _scroll_id = res_json['_scroll_id']
    except KeyError:
        data = []
        _scroll_id = None
        err_msg = 'Error: Elastic Search Scroll: %s'
        print err_msg % str(scroll_res.json())

Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html#search-request-scroll

Mackle answered 10/4, 2018 at 12:31 Comment(3)
Hey @anjan can we use this if we are using elastic version 5. I guess the scroll is only in elastic version 6Pecuniary
@Pecuniary I think it is only available in v6. so, we can't use it in v5.Mackle
i got a way...let me know if you would like to knowPecuniary
L
5

In fact the code has a bug in it - in order to use the scroll feature correctly you are supposed to use the new scroll_id returned with each new call in the next call to scroll(), not reuse the first one:

Important

The initial search request and each subsequent scroll request returns a new scroll_id — only the most recent scroll_id should be used.

http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html

It's working because Elasticsearch does not always change the scroll_id in between calls and can for smaller result sets return the same scroll_id as was originally returned for some time. This discussion from last year is between two other users seeing the same issue, the same scroll_id being returned for awhile:

http://elasticsearch-users.115913.n3.nabble.com/Distributing-query-results-using-scrolling-td4036726.html

So while your code is working for a smaller result set it's not correct - you need to capture the scroll_id returned in each new call to scroll() and use that for the next call.

Leyte answered 28/7, 2014 at 15:42 Comment(4)
Just to make it clear: the bug is with the code in the example, right? No bug in the python client I hope.Halfassed
If this is indeed the case, as I assume, could you please suggest how to correct the example? Merely adding something like sid=rs['_scroll_id'] inside the try doesn't seem to work.Halfassed
Runs into an infinite loop. The while loop doesn't end.Halfassed
That's the second bug in the code from that site - should not be a while loop waiting for an exception, since when scroll is called correctly I don't believe it throws one. You should check to see if any hits are received in that iteration of the loop - if not then you are done.Leyte
P
2

self._elkUrl = "http://Hostname:9200/logstash-*/_search?scroll=1m"

self._scrollUrl="http://Hostname:9200/_search/scroll"


    """
    Function to get the data from ELK through scrolling mechanism
    """ 
    def GetDataFromELK(self):
        #implementing scroll and retriving data from elk to get more than 100000 records at one search
        #ref :https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-scroll.html
        try :
            dataFrame=pd.DataFrame()
            if self._elkUrl is None:
                raise ValueError("_elkUrl is missing")
            if self._username is None:
                raise ValueError("_userNmae for elk is missing")
            if self._password is None:
                raise ValueError("_password for elk is missing")
            response=requests.post(self._elkUrl,json=self.body,auth=(self._username,self._password))
            response=response.json()
            if response is None:
                raise ValueError("response is missing")
            sid  = response['_scroll_id']
            hits = response['hits']
            total= hits["total"]
            if total is  None:
                raise ValueError("total hits from ELK is none")
            total_val=int(total['value'])
            url = self._scrollUrl
            if url is None:
                raise ValueError("scroll url is missing")
            #start scrolling 
            while(total_val>0):
                #keep search context alive for 2m
                scroll = '2m'
                scroll_query={"scroll" : scroll, "scroll_id" : sid }
                response1=requests.post(url,json=scroll_query,auth=(self._username,self._password))
                response1=response1.json()
                # The result from the above request includes a scroll_id, which should be passed to the scroll API in order to retrieve the next batch of results
                sid = response1['_scroll_id']
                hits=response1['hits']
                data=response1['hits']['hits']
                if len(data)>0:
                    cleanDataFrame=self.DataClean(data)
                    dataFrame=dataFrame.append(cleanDataFrame)
                total_val=len(response1['hits']['hits'])
                num=len(dataFrame)
            print('Total records recieved from ELK=',num)
            return dataFrame
        except Exception as e:
            logging.error('Error while getting the data from elk', exc_info=e)
            sys.exit()

Ptolemy answered 26/5, 2020 at 20:32 Comment(0)
J
2
from elasticsearch import Elasticsearch

elasticsearch_user_name ='es_username'
elasticsearch_user_password ='es_password'
es_index = "es_index"

es = Elasticsearch(["127.0.0.1:9200"],
                   http_auth=(elasticsearch_user_name, elasticsearch_user_password))


query = {
  "query": {
     "bool": {
        "must": [
            {
            "range": {
                "es_datetime": {
                    "gte": "2021-06-21T09:00:00.356Z",
                    "lte": "2021-06-21T09:01:00.356Z",
                    "format": "strict_date_optional_time"
                    }
                }
        }
    ]
 }
},
  "fields": [
      "*"
],
  "_source": False,
  "size": 2000,
}
resp = es.search(index=es_index, body=query, scroll="1m")
old_scroll_id = resp['_scroll_id']
results = resp['hits']['hits']
while len(results):
    for i, r in enumerate(results):
    # do something whih data
    pass
    result = es.scroll(
        scroll_id=old_scroll_id,
        scroll='1m'  # length of time to keep search context
    )
    # check if there's a new scroll ID
    if old_scroll_id != result['_scroll_id']:
        print("NEW SCROLL ID:", result['_scroll_id'])
    # keep track of pass scroll _id
    old_scroll_id = result['_scroll_id']

    results = result['hits']['hits']
Jabe answered 23/6, 2021 at 6:23 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.