elasticsearch python bulk api (elasticsearch-py)
Asked Answered
S

2

6

I'm confused about py-elasticsearch bulk @Diolor solution works https://mcmap.net/q/258051/-how-to-use-bulk-api-to-store-the-keywords-in-es-by-using-python, but I would like to use plain es.bulk()

My code:

from elasticsearch import Elasticsearch
es = Elasticsearch()
doc = '''\n {"host":"logsqa","path":"/logs","message":"test test","@timestamp":"2014-10-02T10:11:25.980256","tags":["multiline","mydate_0.005"]} \n'''
result = es.bulk(index="logstash-test", doc_type="test", body=doc)

The ERROR is:

 No handlers could be found for logger "elasticsearch"
Traceback (most recent call last):
  File "./log-parser-perf.py", line 55, in <module>
    insertToES()
  File "./log-parser-perf.py", line 46, in insertToES
    res = es.bulk(index="logstash-test", doc_type="test", body=doc)
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch-1.0.0-py2.7.egg/elasticsearch/client/utils.py", line 70, in _wrapped
    return func(*args, params=params, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch-1.0.0-py2.7.egg/elasticsearch/client/__init__.py", line 570, in bulk
    params=params, body=self._bulk_body(body))
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch-1.0.0-py2.7.egg/elasticsearch/transport.py", line 274, in perform_request
    status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore)
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch-1.0.0-py2.7.egg/elasticsearch/connection/http_urllib3.py", line 57, in perform_request
    self._raise_error(response.status, raw_data)
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch-1.0.0-py2.7.egg/elasticsearch/connection/base.py", line 83, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
elasticsearch.exceptions.TransportError: TransportError(500, u'ActionRequestValidationException[Validation Failed: 1: no requests added;]')

Generated url for POST call is

/logstash-test/test/_bulk

and the POST body is:

{"host":"logsqa","path":"/logs","message":"test test","@timestamp":"2014-10-02T10:11:25.980256","tags":["multiline","mydate_0.005"]}

So I did che curl by hand: This curl does not work:

> curl -XPUT http://localhost:9200/logstash-test/test2/_bulk -d
> '{"host":"logsqa","path":"/logs","message":"test
> test","@timestamp":"2014-10-02T10:11:25.980256","tags":["multiline","mydate_0.005"]}
> '
>
> {"error":"ActionRequestValidationException[Validation Failed: 1: no requests added;]","status":500}

So the error is partially OK, but I did expect that elasticsearch.bulk() will manage properly the input args.

The pythonf function is:

bulk(*args, **kwargs)
    :arg body: The operation definition and data (action-data pairs), as
        either a newline separated string, or a sequence of dicts to
        serialize (one per row).
    :arg index: Default index for items which don't provide one
    :arg doc_type: Default document type for items which don't provide one
        :arg consistency: Explicit write consistency setting for the operation
    :arg refresh: Refresh the index after performing the operation
    :arg routing: Specific routing value
    :arg replication: Explicitly set the replication type (default: sync)
    :arg timeout: Explicit operation timeout
Skink answered 2/10, 2014 at 11:35 Comment(1)
I would advise you to use helpers.bulk() unless you want to do something more complex of course. You can read the source code of how helpers.bulk is made here and implement your own if you wish. helpers.bulk wraps helpers.streaming_bulk which finally wraps the es.bulk.Ropeway
P
7

In case someone is currently trying to use the bulk api and wondering what the format should be, here's what worked for me:

doc = [
    {
        'index':{
            '_index': index_name,
            '_id' : <some_id>,
            '_type':<doc_type>
        }
    },
    {
        'field_1': <value>,
        'field_2': <value>
    }
]

docs_as_string = json.dumps(doc[0]) + '\n' + json.dumps(doc[1]) + '\n'
client.bulk(body=docs_as_string)
Pinprick answered 5/5, 2016 at 20:26 Comment(0)
S
1

From @HonzaKral on github

https://github.com/elasticsearch/elasticsearch-py/issues/135

Hi sirkubax,

the bulk api (as do all the others) follows very closely the bulk api format for elasticsearch itself, so the body would have to be:

doc = '''{"index": {}}\n{"host":"logsqa","path":"/logs","message":"test test","@timestamp":"2014-10-02T10:11:25.980256","tags":["multiline","mydate_0.005"]}\n''' for it to work. Alternatively it could be a list of those two dicts.

This is a complicated and clumsy format to work with from python, that's why I tried to create a more convenient way to work with bulk in elasticsearch.helpers.bulk (0). It simply accepts an iterator of documents, will extract any optional metadata from it (like _id, _type etc) and construct (and execute) the bulk request for you. For more info on the accepted formats see the docs for streaming_bulk above which is a helper to process the stream in iterative manner (one at a time from the point of the user, batched in chunks in the background).

Hope this helps.

0 - http://elasticsearch-py.readthedocs.org/en/master/helpers.html#elasticsearch.helpers.bulk

Skink answered 3/10, 2014 at 5:41 Comment(1)
Could you perhaps add a working example? It is still a bit unclear to me what the exact syntax of a bulk query would be.Fusion

© 2022 - 2024 — McMap. All rights reserved.