How can I bulk upload JSON records to AWS OpenSearch index using a python client library?
Asked Answered
I

3

6

I have a sufficiently large dataset that I would like to bulk index the JSON objects in AWS OpenSearch.

I cannot see how to achieve this using any of: boto3, awswrangler, opensearch-py, elasticsearch, elasticsearch-py.

Is there a way to do this without using a python request (PUT/POST) directly?

Note that this is not for: ElasticSearch, AWS ElasticSearch.

Many thanks!

Importunate answered 8/6, 2022 at 11:32 Comment(2)
Looks like opensearch-py can do this (terrible docs there.... :\ )Importunate
Call .bulk() by following opensearch.org/docs/latest/clients/python in the absence of any fuller docsImportunate
I
16

I finally found a way to do it using opensearch-py, as follows.

First establish the client,

# First fetch credentials from environment defaults
# If you can get this far you probably know how to tailor them
# For your particular situation. Otherwise SO is a safe bet :)
import boto3
credentials = boto3.Session().get_credentials()
region='eu-west-2' # for example
auth = AWSV4SignerAuth(credentials, region)

# Now set up the AWS 'Signer'
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
auth = AWSV4SignerAuth(credentials, region)

# And finally the OpenSearch client
host=f"...{region}.es.amazonaws.com" # fill in your hostname (minus the https://) here
client = OpenSearch(
    hosts = [{'host': host, 'port': 443}],
    http_auth = auth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)

Phew! Let's create the data now:

# Spot the deliberate mistake(s) :D
document1 = {
    "title": "Moneyball",
    "director": "Bennett Miller",
    "year": "2011"
}

document2 = {
    "title": "Apollo 13",
    "director": "Richie Cunningham",
    "year": "1994"
}

data = [document1, document2]

TIP! Create the index if you need to -

my_index = 'my_index'

try:
    response = client.indices.create(my_index)
    print('\nCreating index:')
    print(response)
except Exception as e:
    # If, for example, my_index already exists, do not much!
    print(e)

This is where things go a bit nutty. I hadn't realised that every single bulk action needs an, er, action e.g. "index", "search" etc. - so let's define that now

action={
    "index": {
        "_index": my_index
    }
}

You can read all about the bulk REST API, there.

The next quirk is that the OpenSearch bulk API requires Newline Delimited JSON (see https://www.ndjson.org), which is basically JSON serialized as strings and separated by newlines. Someone wrote on SO that this "bizarre" API looked like one designed by a data scientist - far from taking offence, I think that rocks. (I agree ndjson is weird though.)

Hideously, now let's build up the full JSON string, combining the data and actions. A helper fn is at hand!

def payload_constructor(data,action):
    # "All my own work"

    action_string = json.dumps(action) + "\n"

    payload_string=""

    for datum in data:
        payload_string += action_string
        this_line = json.dumps(datum) + "\n"
        payload_string += this_line
    return payload_string

OK so now we can finally invoke the bulk API. I suppose you could mix in all sorts of actions (out of scope here) - go for it!

response=client.bulk(body=payload_constructor(data,action),index=my_index)

That's probably the most boring punchline ever but there you have it.

You can also just get (geddit) .bulk() to just use index= and set the action to:

action={"index": {}}

Hey presto!

Now, choose your poison - the other solution looks crazily shorter and neater.

PS The well-hidden opensearch-py documentation on this are located here.

Importunate answered 9/6, 2022 at 11:19 Comment(0)
K
1
conn = wr.opensearch.connect(
         host=self.hosts, # URL
         port=443,
         username=self.username,
         password=self.password
    )

def insert_index_data(data, index_name='stocks', delete_index_data=False):
    """ Bulk Create 
        args: body [{doc1}{doc2}....]
    """
    if delete_index_data:
        index_name = 'symbol'
        self.delete_es_index(index_name)
    
    resp = wr.opensearch.index_documents(
         self.conn,
         documents=data,
         index=index_name   
     )
    print(resp)
    return resp
Kerrykersey answered 8/6, 2022 at 23:56 Comment(3)
import awswrangler as wr Am still looking for how to do bulk insert with opensearch-pyKerrykersey
That's cool you've tackled it with the wrangler! I'll take a look asap :)Importunate
Ok this is the first not too complicated and clearer solution that worked for me. It takes the opensearch-py client and a json file and that is pretty much it. Thanks!Akkadian
T
1

I have used below code to bulk insert records from postgres into OpenSearch ( ES 7.2 )

import sqlalchemy as sa
from sqlalchemy import text
import pandas as pd
import numpy as np
from opensearchpy import OpenSearch
from opensearchpy.helpers import bulk
import json

engine = sa.create_engine('postgresql+psycopg2://postgres:[email protected]:5432/postgres')

host = 'search-xxxxxxxxxx.us-east-1.es.amazonaws.com'
port = 443
auth = ('username', 'password') # For testing only. Don't store credentials in code.

# Create the client with SSL/TLS enabled, but hostname verification disabled.
client = OpenSearch(
    hosts = [{'host': host, 'port': port}],
    http_compress = True,
    http_auth = auth,
    use_ssl = True,
    verify_certs = True,
    ssl_assert_hostname = False,
    ssl_show_warn = False
)

        


with engine.connect() as connection:
    result = connection.execute(text("select * from account_1_study_1.stg_pred where domain='LB'"))
    records = []
    for row in result:
        record = dict(row)
        record.update(record['item_dataset'])
        del record['item_dataset']
        records.append(record)
    df = pd.DataFrame(records)
    #df['Date'] = df['Date'].astype(str)
    df = df.fillna("null")
    print(df.keys)
    documents = df.to_dict(orient='records')

    #bulk(es ,documents, index='search-irl-poc-dump', raise_on_error=True)\
    
    #response=client.bulk(body=documents,index='sample-index')
    bulk(client, documents, index='search-irl-poc-dump', raise_on_error=True, refresh=True)
Targum answered 2/7, 2022 at 7:50 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.